DEV Community

Alain Airom
Alain Airom

Posted on

Local Elasticsearch Playground: A Practical Introduction and hands-on test (and moving to a RAG solution)

Hands-on experience to demonstrate advantages of RAG vs. classic search tools

Introduction

On a recent project, our team is deeply involved in a compelling use-case where a key customer possesses several years’ worth of critical documentation, currently indexed and managed within their existing Elasticsearch infrastructure. Faced with the growing demand for more intuitive and efficient access to this vast knowledge base, they are actively exploring the adoption of an AI-powered virtual assistant. This strategic move is driven by the desire to enhance user experience and streamline information retrieval, with a strong inclination towards migrating to a Retrieval-Augmented Generation (RAG) solution to leverage their extensive documentation in a more conversational and intelligent manner.

Image description
Currently, the customer relies on standard search capabilities, which, while functional, have left non-technical users somewhat dissatisfied with the out-of-the-box experience. To address this, the customer provided a set of anonymized documents, enabling us to construct a robust test case. Our initial step involved setting up a local Elasticsearch environment almost from scratch. Following this, we developed a demonstration environment on IBM Cloud, leveraging watsonx Assistant for the virtual agent interface, Watson Discovery (which uses Elasticsearch, as a distributed, RESTful search and analytics engine.), and watsonx.ai Studio to integrate a Large Language Model (LLM), for this case using Mistral, for the RAG solution.

Local Elasticsearch implementation and test with PDF and Doc files

  • To install and use Elasticseach locally, it is quite simple. The stesp are provided.

You need some container engine application on your machine, I use Podman.

Image description

curl -fsSL https://elastic.co/start-local | sh
Elasticsearch: http://localhost:9200
Kibana: http://localhost:5601
Enter fullscreen mode Exit fullscreen mode
  • You will see you username, password and API Key on your console. And you can start, stop and uninstall your local version quite easily.
Open your browser at http://localhost:5601

#   Username: elastic
#   Password: xxxxxx


#🔌 Elasticsearch API endpoint: http://localhost:9200
#🔑 API key: xxxxxxx

#####

cd elastic-start-local
sh ./stop.sh

sh ./start.sh

sh ./uninstall.sh
#####

sh ./start.sh
[+] Running 3/3
 ✔ Container es-local-dev      Healthy                                                                                                                               12.9s 
 ✔ Container kibana_settings   Exited                                                                                                                                12.9s 
 ✔ Container kibana-local-dev  Healthy  
Enter fullscreen mode Exit fullscreen mode
  • On the next step, you can build a simple Python application with a user interface to interact with your local Elasticsearch;
  • Prepare a virtual environment ⬇️
python3 -m venv venv
source venv/bin/activate

pip install --upgrade pip
Enter fullscreen mode Exit fullscreen mode
  • Furthermore, for building interactive front-ends, Streamlit is an excellent choice, and its necessary dependencies should be installed. It’s also worth noting that for robust document processing and content extraction, particularly for diverse file formats prior to indexing in Elasticsearch, integrating a tool like Apache Tika proves to be indispensable.
pip install streamlit elasticsearch tika
pip install watchdog
docker run -p 9998:9998 apache/tika
Enter fullscreen mode Exit fullscreen mode
  • Simple python/streamlit application is provided hereafter. This code accepts almost all file types to be uploaded, during my tests I focused only on PDF and Doc type files (and JSON a bit later).
import streamlit as st
import os
import tempfile
from datetime import datetime
from elasticsearch import Elasticsearch
from tika import parser
import json

st.set_page_config(layout="wide", page_title="Elasticsearch Document Manager")

st.write(f"Streamlit Version: {st.__version__}")
# --- END DIAGNOSTIC ADDITION ---

ELASTICSEARCH_HOST = os.getenv("ELASTICSEARCH_HOST", "http://localhost:9200")
TIKA_SERVER_HOST = os.getenv("TIKA_SERVER_HOST", "http://localhost:9998")

ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY", "xxxx")


def _ensure_numeric_field(doc: dict, field_path: str):
    """
    Ensures a nested field is numeric. Converts to int if possible.
    Handles nested paths like 'origin.binary_hash'.
    """
    parts = field_path.split('.')
    current = doc
    for i, part in enumerate(parts):
        if part in current:
            if i == len(parts) - 1: # Last part of the path
                value = current[part]
                if isinstance(value, str):
                    try:
                        current[part] = int(value)
                        st.info(f"Converted '{field_path}' from string to integer: '{value}' -> {current[part]}")
                    except ValueError:
                        st.warning(f"Could not convert '{field_path}' value '{value}' to integer. Keeping as string.")
                elif not isinstance(value, (int, float)):
                    st.warning(f"Field '{field_path}' has unexpected type: {type(value)}. Skipping numeric conversion.")
            else:
                if not isinstance(current[part], dict):
                    st.warning(f"Expected dictionary at '{'.'.join(parts[:i+1])}' but found {type(current[part])}. Cannot proceed with numeric conversion.")
                    return # Stop if the path is broken
                current = current[part]
        else:
            # Field path not found, nothing to do
            return


@st.cache_resource
def get_es_client():
    """Initializes and returns an Elasticsearch client with API Key authentication (using app3.py's method)."""
    try:
        es = Elasticsearch(
            hosts=[ELASTICSEARCH_HOST],
            api_key=ELASTICSEARCH_API_KEY # Use the API key for authentication
        )
        if not es.ping():
            st.error(f"Could not connect to Elasticsearch at {ELASTICSEARCH_HOST}. Ping returned False. Please check: 1) ES is running. 2) API Key/credentials are correct. 3) Network access from Python environment.")
            return None
        st.success(f"Successfully connected to Elasticsearch at {ELASTICSEARCH_HOST}")
        return es
    except Exception as e:
        st.error(f"Error connecting to Elasticsearch: {e}")
        return None

es = get_es_client()

st.title("📄 Elasticsearch Document Manager")
st.markdown("Upload, Update, and Search Documents in your Local Elasticsearch Instance.")


if es is None:
    st.warning("Could not connect to Elasticsearch. Please check your connection and refresh the page. Details above.")
    st.stop() # Stop execution if ES is not connected

st.header("Upload/Update Document")

with st.expander("Instructions for Tika Server (Required for many file types)"):
    st.markdown("""
        This application uses **Apache Tika** to extract text and metadata from various document types
        (like PDF, DOCX, TXT, HTML, images, etc.). For JSON files, Tika is not used as content is parsed directly.

        You need to run the Tika server separately, ideally via Docker:

        ```

bash
        docker run -p 9998:9998 apache/tika


        ```
        Ensure the Tika server is running before attempting to upload documents for extraction.
        The application expects Tika to be accessible at `{}`.
    """.format(TIKA_SERVER_HOST))

index_name = st.text_input("Enter Elasticsearch Index Name", "my_documents").strip().lower()
doc_id_input = st.text_input("Enter Document ID (Optional, for updating existing document)")

ACCEPTED_FILE_TYPES = [
    "application/pdf", ".pdf", 
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document", ".docx", # DOCX
    "application/json", ".json", 
    "text/plain", ".txt", 
    "text/csv", ".csv", 
    "text/html", ".html", 
    "application/xml", ".xml", "text/xml", 
    "image/jpeg", ".jpeg", ".jpg", 
    "image/png", ".png", 
    "image/gif", ".gif", 
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ".xlsx", # XLSX
    "application/vnd.ms-excel", ".xls", 
    "application/vnd.openxmlformats-officedocument.presentationml.presentation", ".pptx", # PPTX
    "application/vnd.ms-powerpoint", ".ppt", 
]

st.info(f"Allowed file types by uploader (as seen by app): {', '.join(ACCEPTED_FILE_TYPES)}")

uploaded_file = st.file_uploader("Upload Document (PDF, DOCX, JSON, TXT, CSV, HTML, Images, etc.)", 
                                 type=ACCEPTED_FILE_TYPES)

if uploaded_file is not None:
    file_type = uploaded_file.type
    filename = uploaded_file.name
    st.write(f"Uploaded file type detected by Streamlit: **{file_type}** (Filename: **{filename}**)")


def extract_text_from_document(file_path):
    """
    Uses Apache Tika to extract text and metadata from a document.
    Requires Tika server to be running (e.g., via `docker run -p 9998:9998 apache/tika`).
    """
    try:
        parsed_data = parser.from_file(file_path, serverEndpoint=TIKA_SERVER_HOST)
        if parsed_data and parsed_data.get('content'):
            return parsed_data['content'], parsed_data['metadata']
        else:
            return None, None
    except Exception as e:
        st.error(f"Error extracting text with Tika: {e}. Is the Tika server running at {TIKA_SERVER_HOST}?")
        return None, None

def index_document(index_name, doc_id, document_body): # Changed to accept document_body
    """
    Indexes a document into Elasticsearch.
    If doc_id is provided, it attempts to update an existing document.
    """
    if es is None:
        st.error("Elasticsearch client not initialized. Cannot index document.")
        return False, None

    try:
        if doc_id:
            # Update existing document
            response = es.index(index=index_name, id=doc_id, document=document_body)
            st.success(f"Document (ID: {response['_id']}) updated successfully in index '{index_name}'.")
        else:
            # Index new document
            response = es.index(index=index_name, document=document_body)
            st.success(f"Document (ID: {response['_id']}) uploaded successfully to index '{index_name}'.")
        return True, response['_id']
    except Exception as e:
        st.error(f"Error indexing document: {e}")
        return False, None

def create_index_if_not_exists(index_name):
    """Creates an Elasticsearch index with a basic mapping if it doesn't exist."""
    if es is None:
        st.error("Elasticsearch client not initialized. Cannot create index.")
        return False

    if es.indices.exists(index=index_name):
        st.info(f"Index '{index_name}' already exists.")
        return True

    mapping = {
        "mappings": {
            "properties": {
                "filename": {"type": "keyword"},
                "content": {"type": "text"},
                "file_type": {"type": "keyword"},
                "upload_date": {"type": "date"},
                "metadata": {"type": "object", "enabled": False} # Store metadata but don't index its sub-fields by default
            }
        }
    }

    try:
        es.indices.create(index=index_name, body=mapping)
        st.success(f"Index '{index_name}' created successfully.")
        return True
    except Exception as e:
        st.error(f"Error creating index '{index_name}': {e}")
        return False

def search_documents(index_name, query_text):
    """Searches documents in Elasticsearch."""
    if es is None:
        st.error("Elasticsearch client not initialized. Cannot perform search.")
        return []

    if not es.indices.exists(index=index_name):
        st.warning(f"Index '{index_name}' does not exist. Please upload documents first.")
        return []

    search_body = {
        "query": {
            "multi_match": {
                "query": query_text,
                "fields": ["content", "filename", "*"] # Search across content, filename, and all other fields
            }
        },
        "highlight": {
            "fields": {
                "content": {},
                "*": {} 
            },
            "fragment_size": 150, 
            "number_of_fragments": 3 
        }
    }
    try:
        response = es.search(index=index_name, body=search_body)
        return response['hits']['hits']
    except Exception as e:
        st.error(f"Error during search: {e}")
        return []

# --- File Upload and Processing Logic ---
if uploaded_file is not None:
    file_type = uploaded_file.type
    filename = uploaded_file.name

    document_to_index = {} 
    processing_successful = False

    if file_type == "application/json":
        try:
            json_content = uploaded_file.read().decode('utf-8')
            document_to_index = json.loads(json_content)

            document_to_index["_source_filename"] = filename
            document_to_index["_source_file_type"] = file_type
            document_to_index["_upload_date"] = datetime.now().isoformat()

            st.success(f"JSON file '{filename}' parsed successfully!")
            processing_successful = True
        except json.JSONDecodeError as e:
            st.error(f"Error parsing JSON file '{filename}': {e}. Please ensure it's valid JSON.")
            processing_successful = False
    else:
        with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as tmp_file:
            tmp_file.write(uploaded_file.read())
            temp_file_path = tmp_file.name

        st.info(f"Processing '{filename}' with Tika...")
        extracted_content, extracted_metadata = extract_text_from_document(temp_file_path)
        os.remove(temp_file_path) # Clean up temporary file

        if extracted_content:
            document_to_index = {
                "filename": filename,
                "content": extracted_content,
                "file_type": file_type,
                "upload_date": datetime.now().isoformat(),
                "metadata": extracted_metadata
            }
            st.success("Text and metadata extracted successfully with Tika!")
            processing_successful = True
        else:
            st.error(f"Could not extract content from '{filename}' using Tika. "
                     "This might be an unsupported format for Tika, or the Tika server might be down. "
                     "Consider uploading plain text or a supported document type.")
            processing_successful = False

    if processing_successful:
        _ensure_numeric_field(document_to_index, "origin.binary_hash")

    if processing_successful:
        if create_index_if_not_exists(index_name):
            indexed, new_doc_id = index_document(
                index_name,
                doc_id_input if doc_id_input else None,
                document_to_index # Pass the prepared document body
            )
            if indexed:
                st.json({"Indexed ID": new_doc_id, 
                         "Original Filename": filename, 
                         "Detected File Type": file_type,
                         "Details": "See Elasticsearch for full document."})

st.markdown("---")
st.header("Search Documents")

search_query = st.text_input("Enter search query")

if st.button("Search"):
    if search_query:
        st.info(f"Searching for '{search_query}' in index '{index_name}'...")
        hits = search_documents(index_name, search_query)

        if hits:
            st.subheader(f"Found {len(hits)} results:")
            for i, hit in enumerate(hits):
                st.write(f"**Document ID:** `{hit['_id']}`")

                display_filename = hit['_source'].get('filename') or hit['_source'].get('_source_filename', 'N/A')
                st.write(f"**Filename:** `{display_filename}`")
                st.write(f"**Score:** `{hit['_score']:.2f}`")

                if 'highlight' in hit:
                    st.markdown("---")
                    st.markdown("**Highlighted Snippets:**")
                    for field, fragments in hit['highlight'].items():
                        st.markdown(f"**{field}:**")
                        for fragment in fragments:
                            st.markdown(fragment)
                    st.markdown("---")
                else:
                    st.markdown("**Content Snippet:**")
                    if 'content' in hit['_source']:
                        st.text(hit['_source']['content'][:500] + "..." if len(hit['_source']['content']) > 500 else hit['_source']['content'])
                    else:
                        st.json({k: v for k, v in list(hit['_source'].items())[:5]}) # Show first 5 items
                st.markdown("---")
        else:
            st.info("No documents found matching your query.")
    else:
        st.warning("Please enter a search query.")

st.markdown("---")
st.caption("Powered by Streamlit, Elasticsearch, and Apache Tika")
Enter fullscreen mode Exit fullscreen mode

Image description

The outcome is satisfactory, documents are indexed and the full text search capacity retrives documents and brings back correct results.

Elasticsearch test with JSON File Format

To facilitate a comprehensive set of tests, I also leveraged Docling’s multi-format conversion capabilities to transform the received PDF and Word documents into JSON files.

# download and install the requirements
pip install docling
Enter fullscreen mode Exit fullscreen mode

Image description

import json
import logging
import time
from pathlib import Path
import yaml


from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from docling.datamodel.base_models import InputFormat, ConversionStatus
from docling.document_converter import (
    DocumentConverter,
    PdfFormatOption,
    WordFormatOption,
)
from docling.pipeline.simple_pipeline import SimplePipeline
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline

_log = logging.getLogger(__name__)

def main():
    logging.basicConfig(level=logging.INFO) # Set logging level

    data_folder = Path(__file__).parent / "input"

    # Ensure the input folder exists
    if not data_folder.exists():
        _log.error(f"Input folder '{data_folder}' does not exist. Please create it and place documents inside.")
        return # Exit if the input folder is missing

    # Recursively find all files in the 'input' folder and its subdirectories.
    all_files_in_input = list(data_folder.rglob('*'))

    input_doc_paths = [f for f in all_files_in_input if f.is_file()]

    if not input_doc_paths:
        _log.warning(f"No documents found in '{data_folder}' or its subdirectories.")
        _log.warning("Please ensure there are documents in the 'input' folder to process.")
        return # Exit if no documents 

    _log.info(f"Found {len(input_doc_paths)} documents to potentially process.")
    for doc_path in input_doc_paths:
        _log.info(f"  - {doc_path}")

    doc_converter = (
        DocumentConverter(
            allowed_formats=[
                InputFormat.PDF,
                InputFormat.IMAGE,
                InputFormat.DOCX,
                InputFormat.HTML,
                InputFormat.PPTX,
                InputFormat.ASCIIDOC,
                InputFormat.CSV,
                InputFormat.MD,
            ],  
            format_options={
                InputFormat.PDF: PdfFormatOption(
                    pipeline_cls=StandardPdfPipeline, backend=PyPdfiumDocumentBackend
                ),
                InputFormat.DOCX: WordFormatOption(
                    pipeline_cls=SimplePipeline  
                ),
            },
        )
    )

    start_time = time.time()

    conv_results = doc_converter.convert_all(
        input_doc_paths,
        raises_on_error=False,  
    )

    output_dir = Path("scratch")
    output_dir.mkdir(parents=True, exist_ok=True) # Ensure output directory exists

    success_count = 0
    failure_count = 0
    partial_success_count = 0

    for res in conv_results:
        doc_filename = res.input.file.stem
        output_file_base = output_dir / doc_filename

        if res.status == ConversionStatus.SUCCESS:
            success_count += 1
            _log.info(f"Document {res.input.file.name} converted successfully.")

            # Export Docling document format to JSON:
            try:
                with (output_file_base.with_suffix(".json")).open("w") as fp:
                    json.dump(res.document.export_to_dict(), fp, indent=4)
                _log.info(f"  - Saved JSON output to: {output_file_base.with_suffix('.json')}")
            except Exception as e:
                _log.error(f"Error saving JSON for {res.input.file.name}: {e}")

            try:
                if hasattr(res.document, 'export_to_markdown'): # Check if markdown export is supported
                    with (output_file_base.with_suffix(".md")).open("w") as fp:
                        fp.write(res.document.export_to_markdown())
                    _log.info(f"  - Saved Markdown output to: {output_file_base.with_suffix('.md')}")
            except Exception as e:
                _log.error(f"Error saving Markdown for {res.input.file.name}: {e}")


        elif res.status == ConversionStatus.PARTIAL_SUCCESS:
            partial_success_count += 1
            _log.warning(
                f"Document {res.input.file.name} was partially converted with the following errors:"
            )
            for item in res.errors:
                _log.warning(f"\t- {item.error_message}")
        else: 
            failure_count += 1
            _log.error(f"Document {res.input.file.name} failed to convert.")
            for item in res.errors:
                _log.error(f"\t- {item.error_message}")


    end_time = time.time() - start_time

    _log.info(f"Document conversion complete in {end_time:.2f} seconds.")
    _log.info(
        f"Processed {len(input_doc_paths)} total documents: "
        f"{success_count} succeeded, "
        f"{partial_success_count} partially succeeded, "
        f"and {failure_count} failed."
    )

    if failure_count > 0:
        _log.error(
            f"The conversion process completed with {failure_count} failures."
        )

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Once again the outcome and search results are satisfactory.

Image description

So why use an AI Assistant, a LLM and RAG?

While the current search results are satisfactory for basic queries, they simply cannot compare to the unparalleled ease of use offered by an AI assistant utilizing the power of a Large Language Model combined with a RAG solution. This advanced setup empowers end-users to interrogate vast document repositories using natural language, providing contextual, conversational, and highly relevant answers that transcend the limitations of traditional keyword-based searches.

  • First of all, the user is greeted in their own language, providing an immediate sense of personalization and accessibility ☺️.

Image description

  • Once the user provides their query, the information is presented directly in the context of their question, complete with links to the original source documents for easy reference and verification.

Image description

How Many RAG solutions are Provided by IBM?

Image description
The offerings and solutions available to build powerful RAGs on IBM Cloud and/or on-premise platforms include Watson Discovery, watsonx Discovery (leveraging an Elasticsearch vector database), Milvus offering and last but not least, In-Memory RAG capacities. Customers can also programmatically connect to any other RAG solution from the platform via ad-hoc code, notebooks, or extensions.

Image description

Conclusion

In the domain of business use-cases, the advantages of integrating Retrieval-Augmented Generation (RAG) alongside Large Language Models (LLMs) are transformative. RAG significantly mitigates the common challenge of LLM “hallucinations” by grounding responses in verifiable, external data sources, ensuring factual accuracy and building crucial trust in AI-generated content. This capability is vital for industries where precision is paramount, such as legal, healthcare, and finance. Furthermore, RAG addresses the issue of outdated LLM knowledge by providing access to real-time, dynamic information from proprietary internal documentation or external databases, eliminating the need for costly and time-consuming model retraining. Businesses gain enhanced control over the information sources used, ensuring compliance and data privacy, as sensitive data is not permanently embedded within the LLM’s parameters. This approach also fosters explainability, allowing users to trace generated answers back to their original source documents, thereby increasing auditability and confidence. Ultimately, RAG empowers enterprises to deploy more reliable, adaptable, and cost-effective AI solutions that deliver highly relevant and up-to-date insights tailored to specific domain knowledge and current business needs, boosting productivity, customer satisfaction, and strategic decision-making across various functions.

Links

Top comments (0)