🆕 Faster agents with parallel tool execution and guardrails & moderation for safer apps. See what's new in Haystack 2.15 🌟

Breakpoint on Agent in a Pipeline


This notebook demonstrates how to set up breakpoints within an Agent component in a Haystack pipeline. Breakpoints can be placed either on the chat_generator or any the tools used by the Agent. This guide showcases both approaches.

The pipeline features an Agent acting as a database assistant, responsible for extracting relevant information and writing it to the database.

NOTE: this feature is a part of haystack-experimental

Install packages

!pip install "haystack-experimental==0.12.0" #Agent breakpoints was added in 0.12.0
!pip install "transformers[torch,sentencepiece]"
!pip install "sentence-transformers>=3.0.0"

Setup OpenAI API key for the chat_generator

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

Initializations

Now we initialize the components required to build an agentic pipeline. We will set up:

  • A chat_generator for the Agent
  • A custom tool that writes structured information to an InMemoryDocumentStore
  • An Agent that uses the these components to extract and store entities from user-supplied context
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack_experimental.components.agents.agent import Agent
from haystack.components.generators.chat import OpenAIChatGenerator


from haystack.dataclasses import Document
from haystack.tools import tool
from typing import Optional

# Initialize a document store and a chat_generator
document_store = InMemoryDocumentStore()
chat_generator = OpenAIChatGenerator(
    model="gpt-4o-mini",
)

# Initialize a tool
@tool
def add_database_tool(name: str, surname: str, job_title: Optional[str], other: Optional[str]):
    document_store.write_documents(
        [Document(content=name + " " + surname + " " + (job_title or ""), meta={"other":other})]
    )

# Create the Agent
database_assistant = Agent(
        chat_generator=chat_generator,
        tools=[add_database_tool],
        system_prompt="""
        You are a database assistant.
        Your task is to extract the names of people mentioned in the given context and add them to a knowledge base, 
        along with additional relevant information about them that can be extracted from the context.
        Do not use your own knowledge, stay grounded to the given context.
        Do not ask the user for confirmation. Instead, automatically update the knowledge base and return a brief 
        summary of the people added, including the information stored for each.
        """,
        exit_conditions=["text"],
        max_agent_steps=100,
        raise_on_tool_invocation_failure=False
    )

Initialize the Pipeline

In this step, we construct a Haystack pipeline that performs the following tasks:

  • Fetches HTML content from a specified URL.
  • Converts the HTML into Haystack Document objects.
  • Builds a prompt from the extracted content.
  • Passes the prompt to the previously defined Agent, which processes the context and writes relevant information to a document store.
from haystack_experimental.core.pipeline import Pipeline    # Note we are using the Pipeline from experimental package
from haystack.components.converters import HTMLToDocument
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.builders import ChatPromptBuilder
from haystack.dataclasses import ChatMessage


pipeline_with_agent = Pipeline()
pipeline_with_agent.add_component("fetcher", LinkContentFetcher())
pipeline_with_agent.add_component("converter", HTMLToDocument())
pipeline_with_agent.add_component("builder", ChatPromptBuilder(
    template=[ChatMessage.from_user("""
    {% for doc in docs %}
    {{ doc.content|default|truncate(25000) }}
    {% endfor %}
    """)],
    required_variables=["docs"]
))
pipeline_with_agent.add_component("database_agent", database_assistant)

pipeline_with_agent.connect("fetcher.streams", "converter.sources")
pipeline_with_agent.connect("converter.documents", "builder.docs")
pipeline_with_agent.connect("builder", "database_agent")
<haystack_experimental.core.pipeline.pipeline.Pipeline object at 0x116f2d400>
🚅 Components
  - fetcher: LinkContentFetcher
  - converter: HTMLToDocument
  - builder: ChatPromptBuilder
  - database_agent: Agent
🛤️ Connections
  - fetcher.streams -> converter.sources (List[ByteStream])
  - converter.documents -> builder.docs (List[Document])
  - builder.prompt -> database_agent.messages (List[ChatMessage])

Set up Breakpoints

With our pipeline in place, we can now configure a breakpoint on the Agent. This allows us to pause the pipeline execution at a specific step—in this case, during the Agent’s operation—and save the intermediate pipeline snapshot to an external file for inspection or debugging.

We’ll first create a Breakpoint for the chat_generator and then wrap it using AgentBreakpoint, which explicitly targets the Agent component in the pipeline.

Note: Update the debug_path to set where you want to save the file.

from haystack_experimental.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, ToolBreakpoint

agent_generator_breakpoint = Breakpoint("chat_generator", 0)
agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
debug_path = "Your debug path"
pipeline_with_agent.run(
    data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
    break_point=agent_breakpoint,
    debug_path=debug_path,
)
---------------------------------------------------------------------------

BreakpointException                       Traceback (most recent call last)

File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack/core/pipeline/pipeline.py:60, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     59 try:
---> 60     component_output = instance.run(**inputs)
     61 except Exception as error:


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/components/agents/agent.py:458, in Agent.run(self, messages, streaming_callback, break_point, resume_state, debug_path, **kwargs)
    456 while counter < self.max_agent_steps:
    457     # check for breakpoint before ChatGenerator
--> 458     self._check_chat_generator_breakpoint(
    459         break_point,
    460         component_visits,
    461         messages,
    462         generator_inputs,
    463         debug_path,
    464         kwargs,
    465         state,
    466     )
    468     # 1. Call the ChatGenerator


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/components/agents/agent.py:287, in Agent._check_chat_generator_breakpoint(self, agent_breakpoint, component_visits, messages, generator_inputs, debug_path, kwargs, state)
    286 logger.info(msg)
--> 287 raise BreakpointException(
    288     message=msg,
    289     component=break_point.component_name,
    290     state=state_inputs,
    291     results=state.data,
    292 )


BreakpointException: Breaking at chat_generator visit count 0


The above exception was the direct cause of the following exception:


PipelineRuntimeError                      Traceback (most recent call last)

Cell In[7], line 4
      2 agent_breakpoint = AgentBreakpoint(break_point=agent_generator_breakpoint, agent_name='database_agent')
      3 debug_path = "saving_snapshots"
----> 4 pipeline_with_agent.run(
      5     data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
      6     break_point=agent_breakpoint,
      7     debug_path=debug_path,
      8 )


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:339, in Pipeline.run(self, data, include_outputs_from, break_point, resume_state, debug_path)
    336     component_inputs["resume_state"] = resume_state
    337     component_inputs["break_point"] = None
--> 339 component_outputs = self._run_component(
    340     component_name=component_name,
    341     component=component,
    342     inputs=component_inputs,  # the inputs to the current component
    343     component_visits=component_visits,
    344     parent_span=span,
    345 )
    347 # Updates global input state with component outputs and returns outputs that should go to
    348 # pipeline outputs.
    349 component_pipeline_outputs = self._write_component_outputs(
    350     component_name=component_name,
    351     component_outputs=component_outputs,
   (...)    354     include_outputs_from=include_outputs_from,
    355 )


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack/core/pipeline/pipeline.py:62, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     60     component_output = instance.run(**inputs)
     61 except Exception as error:
---> 62     raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
     63 component_visits[component_name] += 1
     65 if not isinstance(component_output, Mapping):


PipelineRuntimeError: The following component failed to run:
Component name: 'database_agent'
Component type: 'Agent'
Error: Breaking at chat_generator visit count 0

We can also place a breakpoint on the tool used by the Agent. This allows us to interrupt the pipeline execution at the point where the tool is invoked by the tool_invoker.

To achieve this, we initialize a ToolBreakpoint with the name of the target tool, wrap it with an AgentBreakpoint, and then run the pipeline with the configured breakpoint.

agent_tool_breakpoint = ToolBreakpoint("tool_invoker", 0, "add_database_tool")
agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')

debug_path = "Your debug path"
pipeline_with_agent.run(
    data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
    break_point=agent_breakpoint,
    debug_path=debug_path,
)
---------------------------------------------------------------------------

BreakpointException                       Traceback (most recent call last)

File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack/core/pipeline/pipeline.py:60, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     59 try:
---> 60     component_output = instance.run(**inputs)
     61 except Exception as error:


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/components/agents/agent.py:458, in Agent.run(self, messages, streaming_callback, break_point, resume_state, debug_path, **kwargs)
    456 while counter < self.max_agent_steps:
    457     # check for breakpoint before ChatGenerator
--> 458     self._check_chat_generator_breakpoint(
    459         break_point,
    460         component_visits,
    461         messages,
    462         generator_inputs,
    463         debug_path,
    464         kwargs,
    465         state,
    466     )
    468     # 1. Call the ChatGenerator


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/components/agents/agent.py:287, in Agent._check_chat_generator_breakpoint(self, agent_breakpoint, component_visits, messages, generator_inputs, debug_path, kwargs, state)
    286 logger.info(msg)
--> 287 raise BreakpointException(
    288     message=msg,
    289     component=break_point.component_name,
    290     state=state_inputs,
    291     results=state.data,
    292 )


BreakpointException: Breaking at tool_invoker visit count 0


The above exception was the direct cause of the following exception:


PipelineRuntimeError                      Traceback (most recent call last)

Cell In[11], line 5
      2 agent_breakpoint = AgentBreakpoint(break_point=agent_tool_breakpoint, agent_name = 'database_agent')
      4 debug_path = "saving_snapshots"
----> 5 pipeline_with_agent.run(
      6     data={"fetcher": {"urls": ["https://en.wikipedia.org/wiki/Deepset"]}},
      7     break_point=agent_breakpoint,
      8     debug_path=debug_path,
      9 )


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack_experimental/core/pipeline/pipeline.py:339, in Pipeline.run(self, data, include_outputs_from, break_point, resume_state, debug_path)
    336     component_inputs["resume_state"] = resume_state
    337     component_inputs["break_point"] = None
--> 339 component_outputs = self._run_component(
    340     component_name=component_name,
    341     component=component,
    342     inputs=component_inputs,  # the inputs to the current component
    343     component_visits=component_visits,
    344     parent_span=span,
    345 )
    347 # Updates global input state with component outputs and returns outputs that should go to
    348 # pipeline outputs.
    349 component_pipeline_outputs = self._write_component_outputs(
    350     component_name=component_name,
    351     component_outputs=component_outputs,
   (...)    354     include_outputs_from=include_outputs_from,
    355 )


File ~/haystack-cookbook/.venv/lib/python3.13/site-packages/haystack/core/pipeline/pipeline.py:62, in Pipeline._run_component(component_name, component, inputs, component_visits, parent_span)
     60     component_output = instance.run(**inputs)
     61 except Exception as error:
---> 62     raise PipelineRuntimeError.from_exception(component_name, instance.__class__, error) from error
     63 component_visits[component_name] += 1
     65 if not isinstance(component_output, Mapping):


PipelineRuntimeError: The following component failed to run:
Component name: 'database_agent'
Component type: 'Agent'
Error: Breaking at tool_invoker visit count 0

Resuming from a break point

Once a pipeline execution has been interrupted, we can resume the pipeline_with_agent from that saved state.

To do this:

  • Use load_state() to load the saved pipeline state from disk. This function converts the stored JSON file back into a Python dictionary representing the intermediate state.
  • Pass this state as an argument to the Pipeline.run() method.

The pipeline will resume execution from where it left off and continue until completion.

from haystack_experimental.core.pipeline.breakpoint import load_state

# resume the pipeline from the saved state
resume_state = load_state("saving_snapshots/database_agent_chat_generator_2025_07_15_14_34_37.json")

result = pipeline_with_agent.run(
    data={},
    resume_state=resume_state
)
print (result["database_agent"]["messages"])