💼 Announcing Haystack Enterprise: Best Practices and Support

Breakpoint on Agent in a Pipeline


This notebook demonstrates how to set up breakpoints within an Agent component in a Haystack pipeline. Breakpoints can be placed either on the chat_generator or on any of the tools used by the Agent. This guide showcases both approaches.

The pipeline features an Agent acting as a database assistant, responsible for extracting relevant information and writing it to the database.

Install packages

%%bash

pip install "haystack-ai>=2.16.1"
pip install "transformers[torch,sentencepiece]"
pip install "sentence-transformers>=3.0.0"

Setup OpenAI API key for the chat_generator

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

Initializations

Now we initialize the components required to build an agentic pipeline. We will set up:

  • A chat_generator for the Agent
  • A custom tool that writes structured information to an InMemoryDocumentStore
  • An Agent that uses the these components to extract and store entities from user-supplied context
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.agents.agent import Agent
from haystack.components.generators.chat import OpenAIChatGenerator

from haystack.dataclasses import Document
from haystack.tools import tool
from typing import Optional

# Initialize a document store and a chat_generator
document_store = InMemoryDocumentStore()
chat_generator = OpenAIChatGenerator(
    model="gpt-4o-mini",
)

# Initialize a tool
@tool
def add_database_tool(name: str, surname: str, job_title: Optional[str], other: Optional[str]):
    document_store.write_documents(
        [Document(content=name + " " + surname + " " + (job_title or ""), meta={"other":other})]
    )

# Create the Agent
database_assistant = Agent(
        chat_generator=chat_generator,
        tools=[add_database_tool],
        system_prompt="""
        You are a database assistant.
        Your task is to extract the names of people mentioned in the given context and add them to a knowledge base, 
        along with additional relevant information about them that can be extracted from the context.
        Do not use your own knowledge, stay grounded to the given context.
        Do not ask the user for confirmation. Instead, automatically update the knowledge base and return a brief 
        summary of the people added, including the information stored for each.
        """,
        exit_conditions=["text"],
        max_agent_steps=100,
        raise_on_tool_invocation_failure=False
    )

Initialize the Pipeline

In this step, we construct a Haystack pipeline that performs the following tasks:

  • Fetches HTML content from a specified URL.
  • Converts the HTML into Haystack Document objects.
  • Builds a prompt from the extracted content.
  • Passes the prompt to the previously defined Agent, which processes the context and writes relevant information to a document store.
from haystack import Pipeline
from haystack.components.converters import HTMLToDocument
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage

pipeline_with_agent = Pipeline()
pipeline_with_agent.add_component("fetcher", LinkContentFetcher())
pipeline_with_agent.add_component("converter", HTMLToDocument())
pipeline_with_agent.add_component("builder", ChatPromptBuilder(
    template=[ChatMessage.from_user("""
    {% for doc in docs %}
    {{ doc.content|default|truncate(25000) }}
    {% endfor %}
    """)],
    required_variables=["docs"]
))
pipeline_with_agent.add_component("database_agent", database_assistant)

pipeline_with_agent.connect("fetcher.streams", "converter.sources")
pipeline_with_agent.connect("converter.documents", "builder.docs")
pipeline_with_agent.connect("builder", "database_agent")
<haystack.core.pipeline.pipeline.Pipeline object at 0x107b24da0>
🚅 Components
  - fetcher: LinkContentFetcher
  - converter: HTMLToDocument
  - builder: ChatPromptBuilder
  - database_agent: Agent
🛤️ Connections
  - fetcher.streams -> converter.sources (List[ByteStream])
  - converter.documents -> builder.docs (List[Document])
  - builder.prompt -> database_agent.messages (List[ChatMessage])

Set up Breakpoints

With our pipeline in place, we can now configure a breakpoint on the Agent. This allows us to pause the pipeline execution at a specific step—in this case, during the Agent’s operation—and save the intermediate pipeline snapshot to an external file for inspection or debugging.

We’ll first create a Breakpoint for the chat_generator and then wrap it using AgentBreakpoint, which explicitly targets the Agent component in the pipeline.

Set the snapshot_file_path to indicate where you want to save the file.

from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint

agent_generator_breakpoint = Breakpoint(component_name="chat_generator", visit_count=0, snapshot_file_path="snapshots/")
agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
pipeline_with_agent.run(
    data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
    break_point=agent_breakpoint,
)
---------------------------------------------------------------------------

BreakpointException                       Traceback (most recent call last)

Cell In[3], line 5
      3 agent_generator_breakpoint = Breakpoint(component_name="chat_generator", visit_count=0, snapshot_file_path="snapshots/")
      4 agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
----> 5 pipeline_with_agent.run(
      6     data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
      7     break_point=agent_breakpoint,
      8 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:382, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
    377         if should_trigger_breakpoint:
    378             _trigger_break_point(
    379                 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
    380             )
--> 382 component_outputs = self._run_component(
    383     component_name=component_name,
    384     component=component,
    385     inputs=component_inputs,  # the inputs to the current component
    386     component_visits=component_visits,
    387     parent_span=span,
    388 )
    390 # Updates global input state with component outputs and returns outputs that should go to
    391 # pipeline outputs.
    392 component_pipeline_outputs = self._write_component_outputs(
    393     component_name=component_name,
    394     component_outputs=component_outputs,
   (...)
    397     include_outputs_from=include_outputs_from,
    398 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:75, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     70     component_output = instance.run(**inputs)
     71 except BreakpointException as error:
     72     # Re-raise BreakpointException to preserve the original exception context
     73     # This is important when Agent components internally use Pipeline._run_component
     74     # and trigger breakpoints that need to bubble up to the main pipeline
---> 75     raise error
     76 except Exception as error:
     77     raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:70, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     67 logger.info("Running component {component_name}", component_name=component_name)
     69 try:
---> 70     component_output = instance.run(**inputs)
     71 except BreakpointException as error:
     72     # Re-raise BreakpointException to preserve the original exception context
     73     # This is important when Agent components internally use Pipeline._run_component
     74     # and trigger breakpoints that need to bubble up to the main pipeline
     75     raise error


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/components/agents/agent.py:350, in Agent.run(self, messages, streaming_callback, break_point, snapshot, **kwargs)
    337 if (
    338     break_point
    339     and break_point.break_point.component_name == "chat_generator"
    340     and component_visits["chat_generator"] == break_point.break_point.visit_count
    341 ):
    342     agent_snapshot = _create_agent_snapshot(
    343         component_visits=component_visits,
    344         agent_breakpoint=break_point,
   (...)
    348         },
    349     )
--> 350     _check_chat_generator_breakpoint(agent_snapshot=agent_snapshot, parent_snapshot=parent_snapshot)
    352 # 1. Call the ChatGenerator
    353 # We skip the chat generator when restarting from a snapshot where we restart at the ToolInvoker.
    354 if skip_chat_generator:


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:377, in _check_chat_generator_breakpoint(agent_snapshot, parent_snapshot)
    372 msg = (
    373     f"Breaking at {break_point.component_name} visit count "
    374     f"{agent_snapshot.component_visits[break_point.component_name]}"
    375 )
    376 logger.info(msg)
--> 377 raise BreakpointException(
    378     message=msg,
    379     component=break_point.component_name,
    380     inputs=agent_snapshot.component_inputs,
    381     results=agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
    382 )


BreakpointException: Breaking at chat_generator visit count 0

This will generate a JSON file, named after the agent and component associated with the breakpoint, in the “snapshosts” directory containing a snapshot of the Pipeline where the Agent is running as well as a snapshot of the Agent state at the time of breakpoint.

!ls snapshots/database_agent_chat*
snapshots/database_agent_chat_generator_2025_07_26_12_22_11.json

We can also place a breakpoint on the tool used by the Agent. This allows us to interrupt the pipeline execution at the point where the tool is invoked by the tool_invoker.

To achieve this, we initialize a ToolBreakpoint with the name of the target tool, wrap it with an AgentBreakpoint, and then run the pipeline with the configured breakpoint.

agent_tool_breakpoint = ToolBreakpoint(component_name="tool_invoker", visit_count=0, tool_name="add_database_tool", snapshot_file_path="snapshots")
agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')

pipeline_with_agent.run(
    data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
    break_point=agent_breakpoint,
)
---------------------------------------------------------------------------

BreakpointException                       Traceback (most recent call last)

Cell In[6], line 4
      1 agent_tool_breakpoint = ToolBreakpoint(component_name="tool_invoker", visit_count=0, tool_name="add_database_tool", snapshot_file_path="snapshots")
      2 agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')
----> 4 pipeline_with_agent.run(
      5     data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
      6     break_point=agent_breakpoint,
      7 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:382, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
    377         if should_trigger_breakpoint:
    378             _trigger_break_point(
    379                 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
    380             )
--> 382 component_outputs = self._run_component(
    383     component_name=component_name,
    384     component=component,
    385     inputs=component_inputs,  # the inputs to the current component
    386     component_visits=component_visits,
    387     parent_span=span,
    388 )
    390 # Updates global input state with component outputs and returns outputs that should go to
    391 # pipeline outputs.
    392 component_pipeline_outputs = self._write_component_outputs(
    393     component_name=component_name,
    394     component_outputs=component_outputs,
   (...)
    397     include_outputs_from=include_outputs_from,
    398 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:75, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     70     component_output = instance.run(**inputs)
     71 except BreakpointException as error:
     72     # Re-raise BreakpointException to preserve the original exception context
     73     # This is important when Agent components internally use Pipeline._run_component
     74     # and trigger breakpoints that need to bubble up to the main pipeline
---> 75     raise error
     76 except Exception as error:
     77     raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:70, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     67 logger.info("Running component {component_name}", component_name=component_name)
     69 try:
---> 70     component_output = instance.run(**inputs)
     71 except BreakpointException as error:
     72     # Re-raise BreakpointException to preserve the original exception context
     73     # This is important when Agent components internally use Pipeline._run_component
     74     # and trigger breakpoints that need to bubble up to the main pipeline
     75     raise error


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/components/agents/agent.py:392, in Agent.run(self, messages, streaming_callback, break_point, snapshot, **kwargs)
    375 if (
    376     break_point
    377     and break_point.break_point.component_name == "tool_invoker"
    378     and break_point.break_point.visit_count == component_visits["tool_invoker"]
    379 ):
    380     agent_snapshot = _create_agent_snapshot(
    381         component_visits=component_visits,
    382         agent_breakpoint=break_point,
   (...)
    390         },
    391     )
--> 392     _check_tool_invoker_breakpoint(
    393         llm_messages=llm_messages, agent_snapshot=agent_snapshot, parent_snapshot=parent_snapshot
    394     )
    396 # 3. Call the ToolInvoker
    397 # We only send the messages from the LLM to the tool invoker
    398 tool_invoker_result = Pipeline._run_component(
    399     component_name="tool_invoker",
    400     component={"instance": self._tool_invoker},
   (...)
    403     parent_span=span,
    404 )


File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:437, in _check_tool_invoker_breakpoint(llm_messages, agent_snapshot, parent_snapshot)
    434     msg += f" for tool {tool_breakpoint.tool_name}"
    435 logger.info(msg)
--> 437 raise BreakpointException(
    438     message=msg,
    439     component=tool_breakpoint.component_name,
    440     inputs=agent_snapshot.component_inputs,
    441     results=agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
    442 )


BreakpointException: Breaking at tool_invoker visit count 0 for tool add_database_tool

Similarly this will also generate a JSON file in the “snapshosts” directory named after the agent’s name and the the “tool_invoker” component which handled the tools used by the Agent.

!ls snapshots/database_agent_tool_invoker*
snapshots/database_agent_tool_invoker_2025_07_26_12_43_03.json

Resuming from a break point

For debugging purposes the snapshot files can be inspected and edited, and later injected into a pipeline and resume the execution from the point where the breakpoint was triggered.

Once a pipeline execution has been interrupted, we can resume the pipeline_with_agent from that saved state.

To do this:

  • Use load_state() to load the saved pipeline state from disk. This function converts the stored JSON file back into a Python dictionary representing the intermediate state.
  • Pass this state as an argument to the Pipeline.run() method.

The pipeline will resume execution from where it left off and continue until completion.

from haystack.core.pipeline.breakpoint import load_pipeline_snapshot

# resume the pipeline from the saved state
snapshot = load_pipeline_snapshot("snapshots/database_agent_chat_generator_2025_07_26_12_22_11.json")

result = pipeline_with_agent.run(
    data={},
    pipeline_snapshot=snapshot
)
print(result['database_agent']['last_message'].text)
The following individuals have been added to the knowledge base along with their relevant information:

1. **Milos Rusic**
   - **Job Title:** Co-Founder
   - **Other:** Co-founded deepset in 2018 in Berlin.

2. **Malte Pietsch**
   - **Job Title:** Co-Founder
   - **Other:** Co-founded deepset in 2018 in Berlin.

3. **Timo Möller**
   - **Job Title:** Co-Founder
   - **Other:** Co-founded deepset in 2018 in Berlin.

4. **Alex Ratner**
   - **Job Title:** Founder
   - **Other:** Snorkel AI.

5. **Mustafa Suleyman**
   - **Job Title:** Co-Founder
   - **Other:** Deepmind.

6. **Spencer Kimball**
   - **Job Title:** Co-Founder
   - **Other:** Cockroach Labs.

7. **Jeff Hammerbacher**
   - **Job Title:** Co-Founder
   - **Other:** Cloudera.

8. **Emil Eifrem**
   - **Job Title:** Founder
   - **Other:** Neo4j. 

This information emphasizes their roles in the establishment and growth of deepset as well as their affiliations with other notable companies in the tech industry.