📚 Learn how to turn Haystack pipelines into production-ready REST APIs or MCP tools

Hybrid RAG Pipeline with Breakpoints


This notebook demonstrates how to setup breakpoints in a Haystack pipeline. In this case, we will set up break points in a hybrid retrieval-augmented generation (RAG) pipeline. The pipeline combines BM25 and embedding-based retrieval methods, then uses a transformer-based reranker and an LLM to generate answers.

NOTE: this feature is a part of haystack-experimental

Install packages

!pip install -U "haystack-experimental==0.10.0"
!pip install "transformers[torch,sentencepiece]"
!pip install "sentence-transformers>=3.0.0"

Setup OpenAI API keys

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

Import Required Libraries

First, let’s import all the necessary components from Haystack.

from haystack_experimental.core.pipeline.pipeline import Pipeline # Note that we need to import the pipeline from haystack-experimental

from haystack import Document
from haystack.components.builders import AnswerBuilder, ChatPromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import DocumentJoiner
from haystack.components.rankers import TransformersSimilarityRanker
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.components.writers import DocumentWriter
from haystack.dataclasses import ChatMessage
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy
/Users/amna.mubashar/haystack-cookbook/.venv/lib/python3.13/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm

Document Store Initializations

Let’s create a simple document store with some sample documents and their embeddings.

def indexing():
    """
    Indexing documents in a DocumentStore.
    """

    print("Indexing documents...")

    # Create sample documents
    documents = [
        Document(content="My name is Jean and I live in Paris. The weather today is 25°C."),
        Document(content="My name is Mark and I live in Berlin. The weather today is 15°C."),
        Document(content="My name is Giorgio and I live in Rome. The weather today is 30°C."),
    ]

    # Initialize document store and components
    document_store = InMemoryDocumentStore()
    doc_writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP)
    doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2", progress_bar=False)

    # Build and run the ingestion pipeline
    ingestion_pipe = Pipeline()
    ingestion_pipe.add_component(instance=doc_embedder, name="doc_embedder")
    ingestion_pipe.add_component(instance=doc_writer, name="doc_writer")

    ingestion_pipe.connect("doc_embedder.documents", "doc_writer.documents")
    ingestion_pipe.run({"doc_embedder": {"documents": documents}})

    return document_store

A Hybrid Retrieval Pipeline

Now let’s build a hybrid RAG pipeline.

def hybrid_retrieval(doc_store):
    """
    A simple pipeline for hybrid retrieval using BM25 and embeddings.
    """

    # Initialize query embedder
    query_embedder = SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", progress_bar=False)

    # Define the prompt template for the LLM
    template = [
        ChatMessage.from_system(
            "You are a helpful AI assistant. Answer the following question based on the given context information only. If the context is empty or just a '\n' answer with None, example: 'None'."
        ),
        ChatMessage.from_user(
            """
            Context:
            {% for document in documents %}
                {{ document.content }}
            {% endfor %}
    
            Question: {{question}}
            """
        )
    ]

    
    # Build the RAG pipeline
    rag_pipeline = Pipeline()
    
    # Add components to the pipeline
    rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=doc_store), name="bm25_retriever")
    rag_pipeline.add_component(instance=query_embedder, name="query_embedder")
    rag_pipeline.add_component(instance=InMemoryEmbeddingRetriever(document_store=doc_store), name="embedding_retriever")
    rag_pipeline.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
    rag_pipeline.add_component(instance=TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=5), name="ranker")    
    rag_pipeline.add_component(instance=ChatPromptBuilder(template=template, required_variables=["question", "documents"]), name="prompt_builder", )    
    rag_pipeline.add_component(instance=OpenAIChatGenerator(), name="llm")
    rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")

    # Connect the components
    rag_pipeline.connect("query_embedder", "embedding_retriever.query_embedding")
    rag_pipeline.connect("embedding_retriever", "doc_joiner.documents")
    rag_pipeline.connect("bm25_retriever", "doc_joiner.documents")
    rag_pipeline.connect("doc_joiner", "ranker.documents")
    rag_pipeline.connect("ranker", "prompt_builder.documents")
    rag_pipeline.connect("prompt_builder", "llm")
    rag_pipeline.connect("llm.replies", "answer_builder.replies")    
    rag_pipeline.connect("doc_joiner", "answer_builder.documents")

    return rag_pipeline

Running the pipeline with breakpoints

Now we demonstrate how to set breakpoints in a Haystack pipeline to inspect and debug the pipeline execution at specific points. Breakpoints allow you to pause execution, save the current state of pipeline, and later resume from where you left off.

We’ll run the pipeline with a breakpoint set at the query_embedder component. This will save the pipeline state before executing the query_embedder and raise PipelineBreakpointException to stop execution.

# Initialize document store and pipeline
doc_store = indexing()
pipeline = hybrid_retrieval(doc_store)

# Define the query
question = "Where does Mark live?"
data = {
    "query_embedder": {"text": question},
    "bm25_retriever": {"query": question},
    "ranker": {"query": question, "top_k": 10},
    "prompt_builder": {"question": question},
    "answer_builder": {"query": question},
}


pipeline.run(data, breakpoints={("query_embedder", 0)}, debug_path="saved_states")
Indexing documents...



---------------------------------------------------------------------------

PipelineBreakpointException               Traceback (most recent call last)

Cell In[6], line 16
      6 question = "Where does Mark live?"
      7 data = {
      8     "query_embedder": {"text": question},
      9     "bm25_retriever": {"query": question},
   (...)     12     "answer_builder": {"query": question},
     13 }
---> 16 pipeline.run(data, breakpoints={("query_embedder", 0)}, debug_path="saved_states")


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:336, in Pipeline.run(self, data, include_outputs_from, breakpoints, resume_state, debug_path)
    334 # keep track of the original input to save it in case of a breakpoint when running the component
    335 self.original_input_data = data
--> 336 component_outputs = self._run_component(
    337     component,
    338     inputs,
    339     component_visits,
    340     validated_breakpoints,
    341     parent_span=span,
    342 )
    344 # Updates global input state with component outputs and returns outputs that should go to
    345 # pipeline outputs.
    346 component_pipeline_outputs = self._write_component_outputs(
    347     component_name=component_name,
    348     component_outputs=component_outputs,
   (...)    351     include_outputs_from=include_outputs_from,
    352 )


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:105, in Pipeline._run_component(self, component, inputs, component_visits, breakpoints, parent_span)
    103         params["template"] = params["_template_string"]
    104     breakpoint_inputs[component_name]["init_parameters"] = params
--> 105     self._check_breakpoints(breakpoints, component_name, component_visits, breakpoint_inputs)
    107 with tracing.tracer.trace(
    108     "haystack.component.run",
    109     tags={
   (...)    130     # We deepcopy the inputs otherwise we might lose that information
    131     # when we delete them in case they're sent to other Components
    132     span.set_content_tag("haystack.component.input", deepcopy(component_inputs))


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:440, in Pipeline._check_breakpoints(self, breakpoints, component_name, component_visits, inputs)
    438 logger.info(msg)
    439 state = self.save_state(inputs, str(component_name), component_visits)
--> 440 raise PipelineBreakpointException(msg, component=component_name, state=state)


PipelineBreakpointException: Breaking at component query_embedder visit count 0

This run shall break with a PipelineBreakpointException: Breaking at component query_embedder visit count 0 - and this will generate a JSON file (e.g.: query_embedder_2025_04_15_15_00_20.json) in a new directory saved_stated containing the pipeline running states before running the component query_embedder.

This file can be explored and used to inspect the pipeline at that execution point.

Resuming from a break point

We can then resume a pipeline from a saved_state by passing it to the Pipeline.run() method. This will run the pipeline to the end.

 # Load the saved state and continue execution
resume_state = pipeline.load_state("saved_states/query_embedder_2025_04_15_15_00_20.json")
result = pipeline.run(data={}, resume_state=resume_state)
    
# Print the results
print(result['answer_builder']['answers'][0].data)
print(result['answer_builder']['answers'][0].meta)
Mark lives in Berlin.
{'model': 'gpt-4o-mini-2024-07-18', 'index': 0, 'finish_reason': 'stop', 'usage': {'completion_tokens': 6, 'prompt_tokens': 74, 'total_tokens': 80, 'completion_tokens_details': CompletionTokensDetails(accepted_prediction_tokens=0, audio_tokens=0, reasoning_tokens=0, rejected_prediction_tokens=0), 'prompt_tokens_details': PromptTokensDetails(audio_tokens=0, cached_tokens=0)}}

Advanced Use Cases for Pipeline Breakpoints

Here are some advanced scenarios where pipeline breakpoints can be particularly valuable:

  1. Set a breakpoint at the LLM to try results of different prompts and iterate in real time.

  2. Place a breakpoint after the document retriever to examine and modify retrieved documents.

  3. Set a breakpoint before a component to inject gold-standard inputs and isolate whether issues stem from input quality or downstream logic.

To demonstrate the use case stated in first point, we reuse the same query pipeline with a new question. First, we run the pipeline with the set prompt. Then, we set a breakpoint at the prompt_builder to try an alternative prompt. This allows us to compare the results generated by different prompts without running the whole pipeline again.

# Initialize document store and pipeline
doc_store = indexing()
pipeline = hybrid_retrieval(doc_store)

# Define the query
question = "What's the temperature difference between the warmest and coldest city?"
data = {
    "query_embedder": {"text": question},
    "bm25_retriever": {"query": question},
    "ranker": {"query": question, "top_k": 10},
    "prompt_builder": {"question": question},
    "answer_builder": {"query": question},
}


pipeline.run(data, breakpoints={("prompt_builder", 0)}, debug_path="saved_states")
Indexing documents...



---------------------------------------------------------------------------

PipelineBreakpointException               Traceback (most recent call last)

Cell In[7], line 16
      6 question = "What's the temperature difference between the warmest and coldest city?"
      7 data = {
      8     "query_embedder": {"text": question},
      9     "bm25_retriever": {"query": question},
   (...)     12     "answer_builder": {"query": question},
     13 }
---> 16 pipeline.run(data, breakpoints={("prompt_builder", 0)}, debug_path="saved_states")


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:336, in Pipeline.run(self, data, include_outputs_from, breakpoints, resume_state, debug_path)
    334 # keep track of the original input to save it in case of a breakpoint when running the component
    335 self.original_input_data = data
--> 336 component_outputs = self._run_component(
    337     component,
    338     inputs,
    339     component_visits,
    340     validated_breakpoints,
    341     parent_span=span,
    342 )
    344 # Updates global input state with component outputs and returns outputs that should go to
    345 # pipeline outputs.
    346 component_pipeline_outputs = self._write_component_outputs(
    347     component_name=component_name,
    348     component_outputs=component_outputs,
   (...)    351     include_outputs_from=include_outputs_from,
    352 )


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:105, in Pipeline._run_component(self, component, inputs, component_visits, breakpoints, parent_span)
    103         params["template"] = params["_template_string"]
    104     breakpoint_inputs[component_name]["init_parameters"] = params
--> 105     self._check_breakpoints(breakpoints, component_name, component_visits, breakpoint_inputs)
    107 with tracing.tracer.trace(
    108     "haystack.component.run",
    109     tags={
   (...)    130     # We deepcopy the inputs otherwise we might lose that information
    131     # when we delete them in case they're sent to other Components
    132     span.set_content_tag("haystack.component.input", deepcopy(component_inputs))


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:440, in Pipeline._check_breakpoints(self, breakpoints, component_name, component_visits, inputs)
    438 logger.info(msg)
    439 state = self.save_state(inputs, str(component_name), component_visits)
--> 440 raise PipelineBreakpointException(msg, component=component_name, state=state)


PipelineBreakpointException: Breaking at component prompt_builder visit count 0

Now we can manually insert a different template into the prompt_builder and inspect the results. To do this, we update the template input within the prompt_builder component in the state file.

template = ChatMessage.from_system(
    """You are a mathematical analysis assistant. Follow these steps:
    1. Identify all temperatures mentioned
    2. Find the maximum and minimum values
    3. Calculate their difference
    4. Format response as: 'The temperature difference is X°C (max Y°C in [city] - min Z°C in [city])'
    Use ONLY the information provided in the context."""
)

Now we just load the state file and resume the pipeline with the altered state.

resume_state = pipeline.load_state("saved_states/prompt_builder_2025_05_21_11_16_36.json")
result = pipeline.run(data={}, resume_state=resume_state)
print(result['answer_builder']['answers'][0].data)
The temperature values mentioned are:
- Paris: 25°C
- Rome: 30°C
- Berlin: 15°C

Determining the maximum and minimum temperatures:
- Maximum temperature: 30°C (in Rome)
- Minimum temperature: 15°C (in Berlin)

Calculating the temperature difference:
- Temperature difference = Maximum - Minimum = 30°C - 15°C = 15°C

Presenting the results:
The temperature difference is 15°C (maximum 30°C in Rome - minimum 15°C in Berlin)