๐Ÿ†• Build and deploy Haystack pipelines with deepset Studio
Maintained by deepset

Integration: MongoDB

Use a MongoDB Atlas database with Haystack

Authors
deepset

Table of Contents

Overview

MongoDB is a document database designed for ease of application development and scaling. MongoDB Atlas is a multi-cloud database service built by people behind MongoDB. MongoDB Atlas simplifies deploying and managing your databases while offering the versatility you need to build resilient and performant global applications on the cloud providers of your choice.

For a detailed overview of all the available methods and settings for the MongoDBAtlasDocumentStore, visit the Haystack Documentation.

Haystack 2.0

Installation

pip install mongodb-atlas-haystack

Usage

To use the MongoDBAtlasDocumentStore, you must have a running MongoDB Atlas database. For details, see Get Started with Atlas.

Once your database is set, set the environment variable MONGO_CONNECTION_STRING with the connection string to your MongoDB Atlas database. The format should be similar to the following: "mongodb+srv://{mongo_atlas_username}:{mongo_atlas_password}@{mongo_atlas_host}/?{mongo_atlas_params_string}"

And then you can initialize a MongoDBAtlasDocumentStore for Haystack with the required configurations:

from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore

document_store = MongoDBAtlasDocumentStore(
    database_name="haystack_test",
    collection_name="test_collection",
    vector_search_index="test_vector_search_index",
)

Example pipelines

Here is some example code of an end-to-end RAG app built on MongoDB Atlas: one indexing pipeline that embeds the documents, and a generative pipeline that can be used for question answering.

from haystack import Pipeline, Document
from haystack.document_stores.types import DuplicatePolicy
from haystack.components.writers import DocumentWriter
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack_integrations.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore
from haystack_integrations.components.retrievers.mongodb_atlas import MongoDBAtlasEmbeddingRetriever

# Create some example documents
documents = [
    Document(content="My name is Jean and I live in Paris."),
    Document(content="My name is Mark and I live in Berlin."),
    Document(content="My name is Giorgio and I live in Rome."),
]

document_store = MongoDBAtlasDocumentStore(
    database_name="haystack_test",
    collection_name="test_collection",
    vector_search_index="test_vector_search_index",
)

# Define some more components
doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2")
query_embedder = SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2")

# Pipeline that ingests document for retrieval
indexing_pipe = Pipeline()
indexing_pipe.add_component(instance=doc_embedder, name="doc_embedder")
indexing_pipe.add_component(instance=doc_writer, name="doc_writer")

indexing_pipe.connect("doc_embedder.documents", "doc_writer.documents")
indexing_pipe.run({"doc_embedder": {"documents": documents}})

# Build a RAG pipeline with a Retriever to get documents relevant to 
# the query, a PromptBuilder to create a custom prompt and the OpenAIGenerator (LLM)
prompt_template = """
Given these documents, answer the question.\nDocuments:
{% for doc in documents %}
    {{ doc.content }}
{% endfor %}

\nQuestion: {{question}}
\nAnswer:
"""
rag_pipeline = Pipeline()
rag_pipeline.add_component(instance=query_embedder, name="query_embedder")
rag_pipeline.add_component(instance=MongoDBAtlasEmbeddingRetriever(document_store=document_store), name="retriever")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")
rag_pipeline.connect("query_embedder", "retriever.query_embedding")
rag_pipeline.connect("embedding_retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")

# Ask a question on the data you just added.
question = "Where does Mark live?"
result = rag_pipeline.run(
    {
        "query_embedder": {"text": question},
        "prompt_builder": {"question": question},
    }
)
print(result)

Haystack 1.x

Installation

pip install farm-haystack[mongodb]

Usage

To use MongoDB Atlas as your data storage for your Haystack LLM pipelines, you must have a running database at MongoDB Atlas. For details, see Get Started with Atlas.

Once your database is set, you can initialize a MongoDBAtlasDocumentStore for Haystack with required configurations:

from haystack.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore

ds=MongoDBAtlasDocumentStore(
    mongo_connection_string="mongodb+srv://{mongo_atlas_username}:{mongo_atlas_password}@{mongo_atlas_host}/?{mongo_atlas_params_string}",
    database_name="database_name",
    collection_name="collection_name",
    vector_search_index="vector_search_index"
)

Writing Documents to MongoDBAtlasDocumentStore

To write documents to your MongoDBAtlasDocumentStore, create an indexing pipeline, or use the write_documents() function. For this step, you may make use of the available FileConverters and PreProcessors, as well as other Integrations that might help you fetch data from other resources. Below is an example indexing pipeline that indexes your Markdown files into a MongoDB Atlas instance.

Indexing Pipeline

from haystack import Pipeline
from haystack.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore
from haystack.nodes import MarkdownConverter, PreProcessor

document_store=MongoDBAtlasDocumentStore(
    mongo_connection_string="mongodb+srv://{mongo_atlas_username}:{mongo_atlas_password}@{mongo_atlas_host}/?{mongo_atlas_params_string}",
    database_name="database_name",
    collection_name="collection_name",
    vector_search_index="vector_search_index",
    embedding_dim=1536
)
converter = MarkdownConverter()
preprocessor = PreProcessor()

indexing_pipeline = Pipeline()
indexing_pipeline.add_node(component=converter, name="PDFConverter", inputs=["File"])
indexing_pipeline.add_node(component=preprocessor, name="PreProcessor", inputs=["PDFConverter"])
indexing_pipeline.add_node(component=document_store, name="DocumentStore", inputs=["PreProcessor"])

indexing_pipeline.run(file_paths=["filename.pdf"])

Query Pipeline

Once you have documents in your MongoDBAtlasDocumentStore, it’s ready to be used in any Haystack pipeline. For example, below is a pipeline that makes use of a custom prompt that is designed to answer questions for the retrieved documents.

from haystack import Pipeline
from haystack.document_stores.mongodb_atlas import MongoDBAtlasDocumentStore
from haystack.nodes import AnswerParser, EmbeddingRetriever, PromptNode, PromptTemplate

document_store=MongoDBAtlasDocumentStore(
    mongo_connection_string="mongodb+srv://{mongo_atlas_username}:{mongo_atlas_password}@{mongo_atlas_host}/?{mongo_atlas_params_string}",
    database_name="database_name",
    collection_name="collection_name",
    vector_search_index="vector_search_index"
)
              
retriever = EmbeddingRetriever(document_store = document_store,
                               embedding_model="sentence-transformers/multi-qa-mpnet-base-dot-v1")
prompt_template = PromptTemplate(prompt = """"Answer the following query based on the provided context. If the context does
                                              not include an answer, reply with 'I don't know'.\n
                                              Query: {query}\n
                                              Documents: {join(documents)}
                                              Answer: 
                                          """,
                                          output_parser=AnswerParser())
prompt_node = PromptNode(model_name_or_path="gpt-4",
                         api_key="YOUR_OPENAI_KEY",
                         default_prompt_template=prompt_template)

query_pipeline = Pipeline()
query_pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"])
query_pipeline.add_node(component=prompt_node, name="PromptNode", inputs=["Retriever"])

query_pipeline.run(query = "What is MongoDB Atlas?", params={"Retriever" : {"top_k": 5}})