Tutorial: Conversational RAG Agent using InMemoryChatMessageStore
Last Updated: January 2, 2026
- Level: Advanced
- Time to complete: 20 minutes
- Components Used:
Agent,ChatPromptBuilder,InMemoryBM25RetrieverOpenAIChatGenerator,OutputAdapter - Experimental Components Used:
ChatMessageStore,ChatMessageRetriever,ChatMessageWriter - Prerequisites: You need an OpenAI API Key
- Goal: After completing this tutorial, you’ll have learned how to incorporate a conversational history into a RAG pipeline to enable multi-turn conversations grounded in documents.
Overview
In this tutorial, you’ll first build a simple conversational pipeline using chat components and an LLM. You’ll then extend this setup into a conversational RAG pipeline using the Agent component, capable of handling multi-turn interactions over documents.
Along the way, you’ll learn how to store, retrieve, and reuse chat history so the system can preserve context across user questions.
Why is this useful? By incorporating conversational memory, a RAG system can remember earlier parts of a conversation. This allows users to ask natural follow-up questions without restating context, while the pipeline continues to retrieve relevant information and generate accurate responses based on the full conversation history.
π§ͺ Beta Feature Notice:
The ChatMessageStore feature is currently in beta, available in the haystack-experimental repository. We’d love your feedback, join the conversation in this GitHub discussion and help us shape this feature!
Installation
Install Haystack, haystack-experimental and datasets with pip:
%%bash
pip install -q haystack-ai "haystack-experimental>=0.15.0" datasets
Enter OpenAI API key
import os
from getpass import getpass
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")
Conversational Pipeline
In this section, we build a simple conversational pipeline using the new chat components. The example focuses on enabling multi-turn conversations and shows how session-level chat history is managed using a chat_history_id.
Create a ChatMessage Store
Conversation history is stored as ChatMessage objects in an InMemoryChatMessageStore. The ChatMessageRetriever reads past messages from the store, while the ChatMessageWriter appends new messages after each interaction. Together, these components allow the pipeline to persist and reuse chat history across multiple turns.
Import these components from the haystack-experimental package:
from haystack_experimental.chat_message_stores.in_memory import InMemoryChatMessageStore
from haystack_experimental.components.retrievers import ChatMessageRetriever
from haystack_experimental.components.writers import ChatMessageWriter
# Chat History components
message_store = InMemoryChatMessageStore()
message_retriever = ChatMessageRetriever(message_store)
message_writer = ChatMessageWriter(message_store)
Build the Pipeline
Now let’s build the conversational pipeline. To understand how conversation history flows through the system, it helps to look at how the components are connected and how messages move between them.
First, we add ChatPromptBuilder, OpenAIChatGenerator, ChatMessageRetriever, ChatMessageWriter and OutputAdapter components to the pipeline.
The prompt generated by the prompt_builder is first sent to the message_retriever as current_messages. This allows the new user message to be combined with any previously stored chat history inside the message_retriever. If a system message is provided to the prompt_builder, message_retriever places it at the beginning, followed by the retrieved history and the current user message. The resulting sequence of messages is then passed to the LLM as input.
At the same time, the same prompt is forwarded to the message_joiner. After the LLM generates a response, the message_joiner combines the original user message with the LLM’s reply. This merged set of messages is then sent to the message_writer to be stored in the chat history.
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
pipeline = Pipeline()
# components to communicate with an LLM
pipeline.add_component(
"prompt_builder",
ChatPromptBuilder(
template=[
ChatMessage.from_system("You are a helpful AI assistant that answers users questions."),
ChatMessage.from_user("{{query}}"),
],
required_variables="*",
),
)
pipeline.add_component("llm", OpenAIChatGenerator(model="gpt-4o"))
# components for chat history retrieval and storage
pipeline.add_component("message_retriever", ChatMessageRetriever(message_store))
pipeline.add_component("message_writer", ChatMessageWriter(message_store))
pipeline.add_component(
"message_joiner", OutputAdapter(template="{{ prompt + replies }}", output_type=list[ChatMessage], unsafe=True)
)
# connections
pipeline.connect("prompt_builder.prompt", "message_retriever.current_messages")
pipeline.connect("prompt_builder.prompt", "message_joiner.prompt")
pipeline.connect("message_retriever.messages", "llm.messages")
pipeline.connect("llm.replies", "message_joiner.replies")
pipeline.connect("message_joiner", "message_writer.messages")
Visualize the pipeline
Visualize the pipeline with the
show() method to confirm the connections are correct.
# pipeline.show()
Run the Pipeline
- Test the pipeline with some queries.
- Ensure that in every request we add a
chat_history_idparameter so that we know which conversational history we’d like to retrieve and write to.
Here are example queries you can try:
- Describe retrieval augmented generation in a few words.
- What do people use it for?
chat_history_id = "user_123_session_1"
while True:
question = input("Enter your question or Q to exit.\nπ§ ")
if question == "Q":
break
res = pipeline.run(
data={
"prompt_builder": {"query": question},
"message_retriever": {"chat_history_id": chat_history_id},
"message_writer": {"chat_history_id": chat_history_id},
},
include_outputs_from={"llm"},
)
print(f'π€ {res["llm"]["replies"][0].text}')
We can also inspect the stored chat history in the InMemoryChatMessageStore with the retrieve_messages method.
By default, the
ChatMessageobjects with “system” role are not stored inChatMessageStore. To include system messages, setskip_system_messages=Falsewhen initializing theInMemoryChatMessageStore.
chat_history = message_store.retrieve_messages(chat_history_id=chat_history_id, last_k=None)
for msg in chat_history:
print(f"ROLE: {msg.role.value}")
print(f"{msg.text}\n")
Switching to a New Chat Session
- Now we can update the
chat_history_idto change to a new chat session with an empty chat history
Here are example queries you can try:
- Who founded deepset?
# Update the chat history ID
chat_history_id = "user_123_session_2"
while True:
question = input("Enter your question or Q to exit.\nπ§ ")
if question == "Q":
break
res = pipeline.run(
data={
"prompt_builder": {"query": question},
"message_retriever": {"chat_history_id": chat_history_id},
"message_writer": {"chat_history_id": chat_history_id},
},
include_outputs_from={"llm"},
)
print(f'π€ {res["llm"]["replies"][0].text}')
Now we can fetch the chat history stored under the "user_123_session_2" ID.
chat_history = message_store.retrieve_messages(chat_history_id="user_123_session_2", last_k=None)
for msg in chat_history:
print(f"ROLE: {msg.role.value}")
print(f"{msg.text}\n")
Other Utility Methods
We have also included other utility methods inside of InMemoryChatMessageStore including:
- count_messages
- delete_messages
- delete_all_messages
For more information check out our ChatMessageStore docs at https://docs.haystack.deepset.ai/reference/experimental-chatmessage-store-api
Conversational RAG Agent
In conversational systems, simply prepending the chat history to a new user message is often not sufficient for effective RAG. To retrieve relevant documents, the system needs a way to reinterpret the user’s query in the context of the ongoing conversation.
For example, if the first user asks βWhat’s the first name of Einstein?β and then follows up with βWhere was he born?β, the system must understand that βheβ refers to Einstein. To retrieve the correct documents, the follow-up query should be reformulated as βWhere was Einstein born?β.
This is where the Agent component comes in. An agent can use its retrieval tool with a rephrased version of the user’s query, ensuring that document retrieval remains accurate across multiple turns.
Create a Document Store and Index Documents
Create an index with seven-wonders dataset:
from haystack import Document
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from datasets import load_dataset
dataset = load_dataset("bilgeyucel/seven-wonders", split="train")
docs = [Document(content=doc["content"], meta=doc["meta"]) for doc in dataset]
document_store = InMemoryDocumentStore()
document_store.write_documents(documents=docs)
Build the Retrieval Tool
To use the retrieval pipeline with the Agent component, we first create a retrieval pipeline and wrap it with the PipelineTool. The answer generation step is handled by the Agent.
π‘ For a complete walkthrough on building RAG and retrieval pipelines, see Creating Your First QA Pipeline with Retrieval-Augmentation Tutorial.
from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack.tools import PipelineTool
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("doc_retriever", InMemoryBM25Retriever(document_store=document_store, top_k=3))
retrieval_pipeline.add_component(
"builder",
PromptBuilder(
template="""
Supporting documents:
{%- if documents|length > 0 %}
{%- for doc in documents %}
Document [{{ loop.index }}] :
{{ doc.content }}
{% endfor -%}
{%- else %}
No relevant documents found.
{% endif %}
""",
required_variables="*",
),
)
retrieval_pipeline.connect("doc_retriever.documents", "builder.documents")
retrieval_tool = PipelineTool(
pipeline=retrieval_pipeline,
name="retrieval_tool",
description="A tool for fetching information on the seven wonders of the ancient world.",
input_mapping={"query": ["doc_retriever.query"]},
output_mapping={"builder.prompt": "retrieval_output"},
)
Build the Conversational RAG Agent
After creating the retrieval tool, we initialize the Agent component with this tool and build a new pipeline. With this setup, the agent can reinterpret incoming user queries based on the conversation history, retrieve the most relevant documents, and generate answers grounded in that retrieved information.
To incorporate conversational context, we add the ChatMessageRetriever and ChatMessageWriter components to the pipeline alongside the Agent like we did in the first pipeline example.
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.agents import Agent
from haystack.components.generators.utils import print_streaming_chunk
conversational_rag_agent = Pipeline()
conversational_rag_agent.add_component(
"agent",
Agent(
system_prompt="""You are a helpful AI assistant that answers users questions grounded in a set supporting documents.
If any questions are asked about the seven wonders, always use the `retrieval_tool` to fetch supporting documents.
Stay concise in your answers.
""",
chat_generator=OpenAIChatGenerator(model="gpt-4o"),
tools=[retrieval_tool],
streaming_callback=print_streaming_chunk,
),
)
# components for chat history storage and retrieval
conversational_rag_agent.add_component("message_retriever", ChatMessageRetriever(message_store))
conversational_rag_agent.add_component("message_writer", ChatMessageWriter(message_store))
# connections for the pipeline
conversational_rag_agent.connect("message_retriever.messages", "agent.messages")
conversational_rag_agent.connect("agent.messages", "message_writer")
Let’s have a conversation π
Now, run the pipeline with the relevant inputs.
Here are some example queries and follow ups you can try:
- What does Rhodes Statue look like? - Who built it? - Did he destroy it?
- Where is Gardens of Babylon? - When was it built?
chat_history_id = "user_123_session_3"
while True:
question = input("Enter your question or Q to exit.\nπ§ ")
if question == "Q":
break
conversational_rag_agent.run(
data={
"message_retriever": {
"current_messages": [ChatMessage.from_user(question)],
"chat_history_id": chat_history_id,
},
"message_writer": {"chat_history_id": chat_history_id},
}
)
# No need to print the output since we are streaming it
β Notice that this time, with the help of query rephrasing, we’ve built a conversational RAG pipeline that can handle follow-up queries and retrieve the relevant documents.
(Notebook by Sebastian Husch Lee and Vladimir Blagojevic)
