Breakpoint on Agent in a Pipeline
Last Updated: July 17, 2025
This notebook demonstrates how to set up breakpoints within an Agent
component in a Haystack pipeline. Breakpoints can be placed either on the chat_generator
or any the tools
used by the Agent
. This guide showcases both approaches.
The pipeline features an Agent
acting as a database assistant, responsible for extracting relevant information and writing it to the database.
NOTE: this feature is a part of
haystack-experimental
Install packages
!pip install "haystack-experimental==0.12.0" #Agent breakpoints was added in 0.12.0
!pip install "transformers[torch,sentencepiece]"
!pip install "sentence-transformers>=3.0.0"
Setup OpenAI API key for the chat_generator
import os
from getpass import getpass
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")
Initializations
Now we initialize the components required to build an agentic pipeline. We will set up:
- A
chat_generator
for the Agent - A custom
tool
that writes structured information to anInMemoryDocumentStore
- An
Agent
that uses the these components to extract and store entities from user-supplied context
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack_experimental.components.agents.agent import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import Document
from haystack.tools import tool
from typing import Optional
# Initialize a document store and a chat_generator
document_store = InMemoryDocumentStore()
chat_generator = OpenAIChatGenerator(
model="gpt-4o-mini",
)
# Initialize a tool
@tool
def add_database_tool(name: str, surname: str, job_title: Optional[str], other: Optional[str]):
document_store.write_documents(
[Document(content=name + " " + surname + " " + (job_title or ""), meta={"other":other})]
)
# Create the Agent
database_assistant = Agent(
chat_generator=chat_generator,
tools=[add_database_tool],
system_prompt="""
You are a database assistant.
Your task is to extract the names of people mentioned in the given context and add them to a knowledge base,
along with additional relevant information about them that can be extracted from the context.
Do not use your own knowledge, stay grounded to the given context.
Do not ask the user for confirmation. Instead, automatically update the knowledge base and return a brief
summary of the people added, including the information stored for each.
""",
exit_conditions=["text"],
max_agent_steps=100,
raise_on_tool_invocation_failure=False
)
Initialize the Pipeline
In this step, we construct a Haystack pipeline that performs the following tasks:
- Fetches HTML content from a specified URL.
- Converts the HTML into Haystack Document objects.
- Builds a
prompt
from the extracted content. - Passes the prompt to the previously defined Agent, which processes the context and writes relevant information to a document store.
from haystack_experimental.core.pipeline import Pipeline # Note we are using the Pipeline from experimental package
from haystack.components.converters import HTMLToDocument
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage
pipeline_with_agent = Pipeline()
pipeline_with_agent.add_component("fetcher", LinkContentFetcher())
pipeline_with_agent.add_component("converter", HTMLToDocument())
pipeline_with_agent.add_component("builder", ChatPromptBuilder(
template=[ChatMessage.from_user("""
{% for doc in docs %}
{{ doc.content|default|truncate(25000) }}
{% endfor %}
""")],
required_variables=["docs"]
))
pipeline_with_agent.add_component("database_agent", database_assistant)
pipeline_with_agent.connect("fetcher.streams", "converter.sources")
pipeline_with_agent.connect("converter.documents", "builder.docs")
pipeline_with_agent.connect("builder", "database_agent")
<haystack_experimental.core.pipeline.pipeline.Pipeline object at 0x116f2d400>
🚅 Components
- fetcher: LinkContentFetcher
- converter: HTMLToDocument
- builder: ChatPromptBuilder
- database_agent: Agent
🛤️ Connections
- fetcher.streams -> converter.sources (List[ByteStream])
- converter.documents -> builder.docs (List[Document])
- builder.prompt -> database_agent.messages (List[ChatMessage])
Set up Breakpoints
With our pipeline in place, we can now configure a breakpoint on the Agent. This allows us to pause the pipeline execution at a specific step—in this case, during the Agent’s operation—and save the intermediate pipeline snapshot to an external file for inspection or debugging.
We’ll first create a Breakpoint
for the chat_generator
and then wrap it using AgentBreakpoint
, which explicitly targets the Agent
component in the pipeline.
Note: Update the debug_path to set where you want to save the file.
from haystack_experimental.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint
agent_generator_breakpoint = Breakpoint("chat_generator", 0)
agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
debug_path = "Your debug path"
pipeline_with_agent.run(
data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
break_point=agent_breakpoint,
debug_path=debug_path,
)
---------------------------------------------------------------------------
BreakpointException Traceback (most recent call last)
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack/core/pipeline/pipeline.py:60, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
59 try:
---> 60 component_output = instance.run(**inputs)
61 except Exception as error:
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/components/agents/agent.py:458, in Agent.run(self, messages, streaming_callback, break_point, resume_state, debug_path, **kwargs)
456 while counter < self.max_agent_steps:
457 # check for breakpoint before ChatGenerator
--> 458 self._check_chat_generator_breakpoint(
459 break_point,
460 component_visits,
461 messages,
462 generator_inputs,
463 debug_path,
464 kwargs,
465 state,
466 )
468 # 1. Call the ChatGenerator
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/components/agents/agent.py:287, in Agent._check_chat_generator_breakpoint(self, agent_breakpoint, component_visits, messages, generator_inputs, debug_path, kwargs, state)
286 logger.info(msg)
--> 287 raise BreakpointException(
288 message=msg,
289 component=break_point.component_name,
290 state=state_inputs,
291 results=state.data,
292 )
BreakpointException: Breaking at chat_generator visit count 0
The above exception was the direct cause of the following exception:
PipelineRuntimeError Traceback (most recent call last)
Cell In[7], line 4
2 agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
3 debug_path = "saving_snapshots"
----> 4 pipeline_with_agent.run(
5 data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
6 break_point=agent_breakpoint,
7 debug_path=debug_path,
8 )
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:339, in Pipeline.run(self, data, include_outputs_from, break_point, resume_state, debug_path)
336 component_inputs["resume_state"] = resume_state
337 component_inputs["break_point"] = None
--> 339 component_outputs = self._run_component(
340 component_name=component_name,
341 component=component,
342 inputs=component_inputs, # the inputs to the current component
343 component_visits=component_visits,
344 parent_span=span,
345 )
347 # Updates global input state with component outputs and returns outputs that should go to
348 # pipeline outputs.
349 component_pipeline_outputs = self._write_component_outputs(
350 component_name=component_name,
351 component_outputs=component_outputs,
(...) 354 include_outputs_from=include_outputs_from,
355 )
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack/core/pipeline/pipeline.py:62, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
60 component_output = instance.run(**inputs)
61 except Exception as error:
---> 62 raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
63 component_visits[component_name] += 1
65 if not isinstance(component_output, Mapping):
PipelineRuntimeError: The following component failed to run:
Component name: 'database_agent'
Component type: 'Agent'
Error: Breaking at chat_generator visit count 0
We can also place a breakpoint on the tool
used by the Agent
. This allows us to interrupt the pipeline execution at the point where the tool
is invoked by the tool_invoker
.
To achieve this, we initialize a ToolBreakpoint
with the name of the target tool, wrap it with an AgentBreakpoint
, and then run the pipeline with the configured breakpoint.
agent_tool_breakpoint = ToolBreakpoint("tool_invoker", 0, "add_database_tool")
agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')
debug_path = "Your debug path"
pipeline_with_agent.run(
data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
break_point=agent_breakpoint,
debug_path=debug_path,
)
---------------------------------------------------------------------------
BreakpointException Traceback (most recent call last)
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack/core/pipeline/pipeline.py:60, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
59 try:
---> 60 component_output = instance.run(**inputs)
61 except Exception as error:
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/components/agents/agent.py:458, in Agent.run(self, messages, streaming_callback, break_point, resume_state, debug_path, **kwargs)
456 while counter < self.max_agent_steps:
457 # check for breakpoint before ChatGenerator
--> 458 self._check_chat_generator_breakpoint(
459 break_point,
460 component_visits,
461 messages,
462 generator_inputs,
463 debug_path,
464 kwargs,
465 state,
466 )
468 # 1. Call the ChatGenerator
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/components/agents/agent.py:287, in Agent._check_chat_generator_breakpoint(self, agent_breakpoint, component_visits, messages, generator_inputs, debug_path, kwargs, state)
286 logger.info(msg)
--> 287 raise BreakpointException(
288 message=msg,
289 component=break_point.component_name,
290 state=state_inputs,
291 results=state.data,
292 )
BreakpointException: Breaking at tool_invoker visit count 0
The above exception was the direct cause of the following exception:
PipelineRuntimeError Traceback (most recent call last)
Cell In[11], line 5
2 agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')
4 debug_path = "saving_snapshots"
----> 5 pipeline_with_agent.run(
6 data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
7 break_point=agent_breakpoint,
8 debug_path=debug_path,
9 )
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:339, in Pipeline.run(self, data, include_outputs_from, break_point, resume_state, debug_path)
336 component_inputs["resume_state"] = resume_state
337 component_inputs["break_point"] = None
--> 339 component_outputs = self._run_component(
340 component_name=component_name,
341 component=component,
342 inputs=component_inputs, # the inputs to the current component
343 component_visits=component_visits,
344 parent_span=span,
345 )
347 # Updates global input state with component outputs and returns outputs that should go to
348 # pipeline outputs.
349 component_pipeline_outputs = self._write_component_outputs(
350 component_name=component_name,
351 component_outputs=component_outputs,
(...) 354 include_outputs_from=include_outputs_from,
355 )
File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack/core/pipeline/pipeline.py:62, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
60 component_output = instance.run(**inputs)
61 except Exception as error:
---> 62 raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
63 component_visits[component_name] += 1
65 if not isinstance(component_output, Mapping):
PipelineRuntimeError: The following component failed to run:
Component name: 'database_agent'
Component type: 'Agent'
Error: Breaking at tool_invoker visit count 0
Resuming from a break point
Once a pipeline execution has been interrupted, we can resume the pipeline_with_agent
from that saved state.
To do this:
- Use
load_state()
to load the saved pipeline state from disk. This function converts the stored JSON file back into a Python dictionary representing the intermediate state. - Pass this state as an argument to the
Pipeline.run()
method.
The pipeline will resume execution from where it left off and continue until completion.
from haystack_experimental.core.pipeline.breakpoint import load_state
# resume the pipeline from the saved state
resume_state = load_state("saving_snapshots/database_agent_chat_generator_2025_07_15_14_34_37.json")
result = pipeline_with_agent.run(
data={},
resume_state=resume_state
)
print (result["database_agent"]["messages"])