Breakpoint on Agent in a Pipeline
Last Updated: August 4, 2025
This notebook demonstrates how to set up breakpoints within an Agent
component in a Haystack pipeline. Breakpoints can be placed either on the chat_generator
or on any of the tools
used by the Agent
. This guide showcases both approaches.
The pipeline features an Agent
acting as a database assistant, responsible for extracting relevant information and writing it to the database.
Install packages
%%bash
pip install "haystack-ai>=2.16.1"
pip install "transformers[torch,sentencepiece]"
pip install "sentence-transformers>=3.0.0"
Setup OpenAI API key for the chat_generator
import os
from getpass import getpass
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")
Initializations
Now we initialize the components required to build an agentic pipeline. We will set up:
- A
chat_generator
for the Agent - A custom
tool
that writes structured information to anInMemoryDocumentStore
- An
Agent
that uses the these components to extract and store entities from user-supplied context
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.agents.agent import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import Document
from haystack.tools import tool
from typing import Optional
# Initialize a document store and a chat_generator
document_store = InMemoryDocumentStore()
chat_generator = OpenAIChatGenerator(
model="gpt-4o-mini",
)
# Initialize a tool
@tool
def add_database_tool(name: str, surname: str, job_title: Optional[str], other: Optional[str]):
document_store.write_documents(
[Document(content=name + " " + surname + " " + (job_title or ""), meta={"other":other})]
)
# Create the Agent
database_assistant = Agent(
chat_generator=chat_generator,
tools=[add_database_tool],
system_prompt="""
You are a database assistant.
Your task is to extract the names of people mentioned in the given context and add them to a knowledge base,
along with additional relevant information about them that can be extracted from the context.
Do not use your own knowledge, stay grounded to the given context.
Do not ask the user for confirmation. Instead, automatically update the knowledge base and return a brief
summary of the people added, including the information stored for each.
""",
exit_conditions=["text"],
max_agent_steps=100,
raise_on_tool_invocation_failure=False
)
Initialize the Pipeline
In this step, we construct a Haystack pipeline that performs the following tasks:
- Fetches HTML content from a specified URL.
- Converts the HTML into Haystack Document objects.
- Builds a
prompt
from the extracted content. - Passes the prompt to the previously defined Agent, which processes the context and writes relevant information to a document store.
from haystack import Pipeline
from haystack.components.converters import HTMLToDocument
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage
pipeline_with_agent = Pipeline()
pipeline_with_agent.add_component("fetcher", LinkContentFetcher())
pipeline_with_agent.add_component("converter", HTMLToDocument())
pipeline_with_agent.add_component("builder", ChatPromptBuilder(
template=[ChatMessage.from_user("""
{% for doc in docs %}
{{ doc.content|default|truncate(25000) }}
{% endfor %}
""")],
required_variables=["docs"]
))
pipeline_with_agent.add_component("database_agent", database_assistant)
pipeline_with_agent.connect("fetcher.streams", "converter.sources")
pipeline_with_agent.connect("converter.documents", "builder.docs")
pipeline_with_agent.connect("builder", "database_agent")
<haystack.core.pipeline.pipeline.Pipeline object at 0x107b24da0>
🚅 Components
- fetcher: LinkContentFetcher
- converter: HTMLToDocument
- builder: ChatPromptBuilder
- database_agent: Agent
🛤️ Connections
- fetcher.streams -> converter.sources (List[ByteStream])
- converter.documents -> builder.docs (List[Document])
- builder.prompt -> database_agent.messages (List[ChatMessage])
Set up Breakpoints
With our pipeline in place, we can now configure a breakpoint on the Agent. This allows us to pause the pipeline execution at a specific step—in this case, during the Agent’s operation—and save the intermediate pipeline snapshot to an external file for inspection or debugging.
We’ll first create a Breakpoint
for the chat_generator
and then wrap it using AgentBreakpoint
, which explicitly targets the Agent
component in the pipeline.
Set the snapshot_file_path
to indicate where you want to save the file.
from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint
agent_generator_breakpoint = Breakpoint(component_name="chat_generator", visit_count=0, snapshot_file_path="snapshots/")
agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
pipeline_with_agent.run(
data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
break_point=agent_breakpoint,
)
---------------------------------------------------------------------------
BreakpointException Traceback (most recent call last)
Cell In[3], line 5
3 agent_generator_breakpoint = Breakpoint(component_name="chat_generator", visit_count=0, snapshot_file_path="snapshots/")
4 agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
----> 5 pipeline_with_agent.run(
6 data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
7 break_point=agent_breakpoint,
8 )
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:382, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
377 if should_trigger_breakpoint:
378 _trigger_break_point(
379 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
380 )
--> 382 component_outputs = self._run_component(
383 component_name=component_name,
384 component=component,
385 inputs=component_inputs, # the inputs to the current component
386 component_visits=component_visits,
387 parent_span=span,
388 )
390 # Updates global input state with component outputs and returns outputs that should go to
391 # pipeline outputs.
392 component_pipeline_outputs = self._write_component_outputs(
393 component_name=component_name,
394 component_outputs=component_outputs,
(...)
397 include_outputs_from=include_outputs_from,
398 )
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:75, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
70 component_output = instance.run(**inputs)
71 except BreakpointException as error:
72 # Re-raise BreakpointException to preserve the original exception context
73 # This is important when Agent components internally use Pipeline._run_component
74 # and trigger breakpoints that need to bubble up to the main pipeline
---> 75 raise error
76 except Exception as error:
77 raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:70, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
67 logger.info("Running component {component_name}", component_name=component_name)
69 try:
---> 70 component_output = instance.run(**inputs)
71 except BreakpointException as error:
72 # Re-raise BreakpointException to preserve the original exception context
73 # This is important when Agent components internally use Pipeline._run_component
74 # and trigger breakpoints that need to bubble up to the main pipeline
75 raise error
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/components/agents/agent.py:350, in Agent.run(self, messages, streaming_callback, break_point, snapshot, **kwargs)
337 if (
338 break_point
339 and break_point.break_point.component_name == "chat_generator"
340 and component_visits["chat_generator"] == break_point.break_point.visit_count
341 ):
342 agent_snapshot = _create_agent_snapshot(
343 component_visits=component_visits,
344 agent_breakpoint=break_point,
(...)
348 },
349 )
--> 350 _check_chat_generator_breakpoint(agent_snapshot=agent_snapshot, parent_snapshot=parent_snapshot)
352 # 1. Call the ChatGenerator
353 # We skip the chat generator when restarting from a snapshot where we restart at the ToolInvoker.
354 if skip_chat_generator:
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:377, in _check_chat_generator_breakpoint(agent_snapshot, parent_snapshot)
372 msg = (
373 f"Breaking at {break_point.component_name} visit count "
374 f"{agent_snapshot.component_visits[break_point.component_name]}"
375 )
376 logger.info(msg)
--> 377 raise BreakpointException(
378 message=msg,
379 component=break_point.component_name,
380 inputs=agent_snapshot.component_inputs,
381 results=agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
382 )
BreakpointException: Breaking at chat_generator visit count 0
This will generate a JSON file, named after the agent and component associated with the breakpoint, in the “snapshosts” directory containing a snapshot of the Pipeline where the Agent is running as well as a snapshot of the Agent state at the time of breakpoint.
!ls snapshots/database_agent_chat*
snapshots/database_agent_chat_generator_2025_07_26_12_22_11.json
We can also place a breakpoint on the tool
used by the Agent
. This allows us to interrupt the pipeline execution at the point where the tool
is invoked by the tool_invoker
.
To achieve this, we initialize a ToolBreakpoint
with the name of the target tool, wrap it with an AgentBreakpoint
, and then run the pipeline with the configured breakpoint.
agent_tool_breakpoint = ToolBreakpoint(component_name="tool_invoker", visit_count=0, tool_name="add_database_tool", snapshot_file_path="snapshots")
agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')
pipeline_with_agent.run(
data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
break_point=agent_breakpoint,
)
---------------------------------------------------------------------------
BreakpointException Traceback (most recent call last)
Cell In[6], line 4
1 agent_tool_breakpoint = ToolBreakpoint(component_name="tool_invoker", visit_count=0, tool_name="add_database_tool", snapshot_file_path="snapshots")
2 agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')
----> 4 pipeline_with_agent.run(
5 data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
6 break_point=agent_breakpoint,
7 )
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:382, in Pipeline.run(self, data, include_outputs_from, break_point, pipeline_snapshot)
377 if should_trigger_breakpoint:
378 _trigger_break_point(
379 pipeline_snapshot=new_pipeline_snapshot, pipeline_outputs=pipeline_outputs
380 )
--> 382 component_outputs = self._run_component(
383 component_name=component_name,
384 component=component,
385 inputs=component_inputs, # the inputs to the current component
386 component_visits=component_visits,
387 parent_span=span,
388 )
390 # Updates global input state with component outputs and returns outputs that should go to
391 # pipeline outputs.
392 component_pipeline_outputs = self._write_component_outputs(
393 component_name=component_name,
394 component_outputs=component_outputs,
(...)
397 include_outputs_from=include_outputs_from,
398 )
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:75, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
70 component_output = instance.run(**inputs)
71 except BreakpointException as error:
72 # Re-raise BreakpointException to preserve the original exception context
73 # This is important when Agent components internally use Pipeline._run_component
74 # and trigger breakpoints that need to bubble up to the main pipeline
---> 75 raise error
76 except Exception as error:
77 raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/pipeline.py:70, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
67 logger.info("Running component {component_name}", component_name=component_name)
69 try:
---> 70 component_output = instance.run(**inputs)
71 except BreakpointException as error:
72 # Re-raise BreakpointException to preserve the original exception context
73 # This is important when Agent components internally use Pipeline._run_component
74 # and trigger breakpoints that need to bubble up to the main pipeline
75 raise error
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/components/agents/agent.py:392, in Agent.run(self, messages, streaming_callback, break_point, snapshot, **kwargs)
375 if (
376 break_point
377 and break_point.break_point.component_name == "tool_invoker"
378 and break_point.break_point.visit_count == component_visits["tool_invoker"]
379 ):
380 agent_snapshot = _create_agent_snapshot(
381 component_visits=component_visits,
382 agent_breakpoint=break_point,
(...)
390 },
391 )
--> 392 _check_tool_invoker_breakpoint(
393 llm_messages=llm_messages, agent_snapshot=agent_snapshot, parent_snapshot=parent_snapshot
394 )
396 # 3. Call the ToolInvoker
397 # We only send the messages from the LLM to the tool invoker
398 tool_invoker_result = Pipeline._run_component(
399 component_name="tool_invoker",
400 component={"instance": self._tool_invoker},
(...)
403 parent_span=span,
404 )
File ~/haystack-cookbook/.venv/lib/python3.12/site-packages/haystack/core/pipeline/breakpoint.py:437, in _check_tool_invoker_breakpoint(llm_messages, agent_snapshot, parent_snapshot)
434 msg += f" for tool {tool_breakpoint.tool_name}"
435 logger.info(msg)
--> 437 raise BreakpointException(
438 message=msg,
439 component=tool_breakpoint.component_name,
440 inputs=agent_snapshot.component_inputs,
441 results=agent_snapshot.component_inputs["tool_invoker"]["serialized_data"]["state"],
442 )
BreakpointException: Breaking at tool_invoker visit count 0 for tool add_database_tool
Similarly this will also generate a JSON file in the “snapshosts” directory named after the agent’s name and the the “tool_invoker” component which handled the tools used by the Agent.
!ls snapshots/database_agent_tool_invoker*
snapshots/database_agent_tool_invoker_2025_07_26_12_43_03.json
Resuming from a break point
For debugging purposes the snapshot files can be inspected and edited, and later injected into a pipeline and resume the execution from the point where the breakpoint was triggered.
Once a pipeline execution has been interrupted, we can resume the pipeline_with_agent
from that saved state.
To do this:
- Use
load_state()
to load the saved pipeline state from disk. This function converts the stored JSON file back into a Python dictionary representing the intermediate state. - Pass this state as an argument to the
Pipeline.run()
method.
The pipeline will resume execution from where it left off and continue until completion.
from haystack.core.pipeline.breakpoint import load_pipeline_snapshot
# resume the pipeline from the saved state
snapshot = load_pipeline_snapshot("snapshots/database_agent_chat_generator_2025_07_26_12_22_11.json")
result = pipeline_with_agent.run(
data={},
pipeline_snapshot=snapshot
)
print(result['database_agent']['last_message'].text)
The following individuals have been added to the knowledge base along with their relevant information:
1. **Milos Rusic**
- **Job Title:** Co-Founder
- **Other:** Co-founded deepset in 2018 in Berlin.
2. **Malte Pietsch**
- **Job Title:** Co-Founder
- **Other:** Co-founded deepset in 2018 in Berlin.
3. **Timo Möller**
- **Job Title:** Co-Founder
- **Other:** Co-founded deepset in 2018 in Berlin.
4. **Alex Ratner**
- **Job Title:** Founder
- **Other:** Snorkel AI.
5. **Mustafa Suleyman**
- **Job Title:** Co-Founder
- **Other:** Deepmind.
6. **Spencer Kimball**
- **Job Title:** Co-Founder
- **Other:** Cockroach Labs.
7. **Jeff Hammerbacher**
- **Job Title:** Co-Founder
- **Other:** Cloudera.
8. **Emil Eifrem**
- **Job Title:** Founder
- **Other:** Neo4j.
This information emphasizes their roles in the establishment and growth of deepset as well as their affiliations with other notable companies in the tech industry.