πŸ†• Haystack 2.11 is out with shorter import time and extended async support. Go to release notes for all highlights 🌟

Running Haystack Pipelines in Asynchronous Environments


Notebook by Madeeswaran Kannan & Mathis Lucka

In this notebook, you’ll learn how to use the AsyncPipeline and async-enabled components to build and execute a Haystack pipeline in an asynchronous environment. It’s based on this short Haystack tutorial, so it would be a good idea to familiarize yourself with it before we begin. A further prerequisite is working knowledge of cooperative scheduling and async programming in Python.

Motivation

By default, the Pipeline class in haystack is a regular Python object class that exposes non-async methods to add/connect components and execute the pipeline logic. Currently, it can be used in async environments, but it’s not optimal to do so since it executes its logic in a ‘ blocking’ fashion, i.e., once the Pipeline.run method is invoked, it must run to completion and return the outputs before the next statement of code can be executed1. In a typical async environment, this prevents active async event loop from scheduling other async coroutines, thereby reducing throughput. To mitigate this bottleneck, we introduce the concept of async-enabled Haystack components and an AsyncPipeline class that cooperatively schedules the execution of both async and non-async components.

1 - This is a simplification as the Python runtime can potentially schedule another thread, but it’s a detail that we can ignore in this case.

AsyncPipeline Benefits

  • Execute components concurrently to speed-up pipeline execution.
  • Execute components step by step to debug your pipelines.
  • Increase throughput in async environments, e.g. when executing pipelines behind a FastAPI-endpoint.
  • Allow individual components to opt into async support.
    • Not all components benefit from being async-enabled - I/O-bound components are the most suitable candidates.
  • Provide a backward-compatible way to execute Haystack pipelines containing both async and non-async components.

Let’s now go ahead and see what it takes to add async support to the original tutorial, starting with installing Haystack, and the requisite dependencies.

Development Environment

%%bash

pip install -U haystack-ai -q
pip install datasets -q
pip install sentence-transformers -q
pip install nest_asyncio -q

Provide an OpenAI API key to ensure that LLM generator can query the OpenAI API.

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
    os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

# If you're running this notebook on Google Colab, you can do the following instead:
#
# from google.colab import userdata
# if "OPENAI_API_KEY" not in os.environ:
#  os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
# The IPython environment is already running an event-loop.
# We will later use a method that tries to create another event-loop which would fail without this snippet.
import nest_asyncio
nest_asyncio.apply()

Creating an AsyncPipeline

Fetching and Indexing Documents

Initialize a DocumentStore to index your documents.

from haystack.document_stores.in_memory import InMemoryDocumentStore

document_store = InMemoryDocumentStore()

Fetch the data and convert it into Haystack Documents.

from datasets import load_dataset
from haystack import Document

dataset = load_dataset("bilgeyucel/seven-wonders", split="train")
docs = [Document(content=doc["content"], meta=doc["meta"]) for doc in dataset]

To store your data in the DocumentStore with embeddings, initialize a SentenceTransformersDocumentEmbedder with the model name and call warm_up() to download the embedding model.

Then, we calculate the embeddings of the docs with the newly warmed-up embedder and write the documents to the document store.

from haystack.components.embedders import SentenceTransformersDocumentEmbedder

doc_embedder = SentenceTransformersDocumentEmbedder(
    model="sentence-transformers/all-MiniLM-L6-v2"
)
doc_embedder.warm_up()

docs_with_embeddings = doc_embedder.run(docs)
n_docs_written = document_store.write_documents(docs_with_embeddings["documents"])
print(f"Indexed {n_docs_written} documents")
Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:00<00:00,  5.37it/s]

Indexed 151 documents

The next step is to build the RAG pipeline to generate answers for a user query. We build a RAG pipeline using hybrid retrieval. Hybrid retrieval uses to retrieval branches that can run concurrently.

Initialize a text embedder to create an embedding for the user query and an InMemoryEmbeddingRetriever as well as a InMemoryBM25Retriever to use with the InMemoryDocumentStore you initialized earlier. We feed the results of both retrievers into a DocumentJoiner and we use reciprocal rank fusion to get to our final ranking of the documents.

from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever, InMemoryBM25Retriever
from haystack.components.joiners import DocumentJoiner

text_embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
embedding_retriever = InMemoryEmbeddingRetriever(document_store)
bm25_retriever = InMemoryBM25Retriever(document_store)
joiner = DocumentJoiner(join_mode="reciprocal_rank_fusion")

Create a custom prompt to use with the ChatPromptBuilder and initialize a OpenAIChatGenerator to consume the output of the former.

from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage

template = """
Given the following information, answer the question.

Context:
{% for document in documents %}
    {{ document.content }}
{% endfor %}

Question: {{question}}
Answer:
"""

prompt_builder = ChatPromptBuilder(template=[ChatMessage.from_user(template)])
generator = OpenAIChatGenerator(model="gpt-4o-mini")

We finally get to the creation of the pipeline instance. Instead of using the Pipeline class, we use the AsyncPipeline class.

The rest of the process, i.e., adding components and connecting them with each other remains the same as with the original Pipeline class.

from haystack import AsyncPipeline

async_rag_pipeline = AsyncPipeline()
# Add components to your pipeline
async_rag_pipeline.add_component("text_embedder", text_embedder)
async_rag_pipeline.add_component("embedding_retriever", embedding_retriever)
async_rag_pipeline.add_component("bm25_retriever", bm25_retriever)
async_rag_pipeline.add_component("joiner", joiner)
async_rag_pipeline.add_component("prompt_builder", prompt_builder)
async_rag_pipeline.add_component("llm", generator)

# Now, connect the components to each other
async_rag_pipeline.connect("text_embedder.embedding", "embedding_retriever.query_embedding")
async_rag_pipeline.connect("bm25_retriever.documents", "joiner.documents")
async_rag_pipeline.connect("embedding_retriever.documents", "joiner.documents")
async_rag_pipeline.connect("joiner.documents", "prompt_builder.documents")
async_rag_pipeline.connect("prompt_builder.prompt", "llm.messages")

async_rag_pipeline.show()
# You can see from the visual pipeline representation that embedding retriever and bm25 retriever do not depend on each other; they could run concurrently

Now, we create a coroutine that queries the pipeline with a question.

We use the run_async_generator method to execute the AsyncPipeline. run_async_generator returns an AsyncIterator that we need to iterate over to make progress in the pipeline’s execution.

Essentially, this allows us to step through the pipeline execution component by component, which is useful for debugging a pipeline or when you want to run custom logic upon any component’s completion.

The AsyncPipeline also exposes:

  • a run_async method that executes the full pipeline before returning the final outputs
  • a run method that can be called from non-async environments but still executes components concurrently; the run method is a drop-in replacement for Pipeline.run

We iterate over the AsyncIterator and print intermediate outputs from the retrievers and the joiner.

question = "Where is Gardens of Babylon?"
inputs = {
    "text_embedder": {"text": question},
    "bm25_retriever": {"query": question},
    "prompt_builder": {"question": question},
}
include_outputs_from = ["embedding_retriever", "bm25_retriever", "joiner"]
consumed_outputs = []
async for output in async_rag_pipeline.run_async_generator(data=inputs, include_outputs_from=include_outputs_from):
    final_output = output
    for component, results in output.items():
        if component not in consumed_outputs:
            consumed_outputs.append(component)
            if "documents" in results:
                print(f"Outputs from `{component}`.")
                for doc in results["documents"][:1]:
                    print(f"Score: {doc.score}")
                    print(f"Content: {doc.content[:500]}...")
                    print("------------")


print("LLM Response:")
print(final_output["llm"]["replies"][0].text)
    
Outputs from `bm25_retriever`.
Score: 13.520340633994273
Content: [21] However, the gardens were said to still exist at the time that later writers described them, and some of these accounts are regarded as deriving from people who had visited Babylon.[2] Herodotus, who describes Babylon in his Histories, does not mention the Hanging Gardens,[22] although it could be that the gardens were not yet well known to the Greeks at the time of his visit.[2]
To date, no archaeological evidence has been found at Babylon for the Hanging Gardens.[6] It is possible that ev...
------------
Outputs from `embedding_retriever`.
Score: 0.6933103186685945
Content: The construction of the Hanging Gardens has also been attributed to the legendary queen Semiramis[4] and they have been called the Hanging Gardens of Semiramis as an alternative name.[5]
The Hanging Gardens are the only one of the Seven Wonders for which the location has not been definitively established.[6] There are no extant Babylonian texts that mention the gardens, and no definitive archaeological evidence has been found in Babylon.[7][8] Three theories have been suggested to account for th...
------------
Outputs from `joiner`.
Score: 0.9919354838709679
Content: [21] However, the gardens were said to still exist at the time that later writers described them, and some of these accounts are regarded as deriving from people who had visited Babylon.[2] Herodotus, who describes Babylon in his Histories, does not mention the Hanging Gardens,[22] although it could be that the gardens were not yet well known to the Greeks at the time of his visit.[2]
To date, no archaeological evidence has been found at Babylon for the Hanging Gardens.[6] It is possible that ev...
------------
LLM Response:
The Hanging Gardens of Babylon are said to have been located in the ancient city of Babylon, near present-day Hillah in the Babil province of Iraq. However, the exact location has not been definitively established, and there is no archaeological evidence confirming their existence in Babylon. Some theories suggest that they could have been confused with the gardens built by the Assyrian king Sennacherib in his capital city of Nineveh, near modern-day Mosul.

Sequential vs Concurrent Execution

Now, let’s compare sequential execution and concurrent execution of multiple queries. We create two utility functions that run a list of questions. Both use the AsyncPipeline but only one of them runs each question as a co-routine.

import asyncio

def sequential_execution(pipeline: AsyncPipeline, questions: list[str]):
    results = []
    for question in examples:
        inputs = {
            "text_embedder": {"text": question},
            "bm25_retriever": {"query": question},
            "prompt_builder": {"question": question},
        }
        
        results.append(pipeline.run(data=inputs))
    
    return results

async def concurrent_execution(pipeline: AsyncPipeline, questions: list[str]):
    tasks = [pipeline.run_async(data={
        "text_embedder": {"text": question},
        "bm25_retriever": {"query": question},
        "prompt_builder": {"question": question},
    }) for question in questions]
    
    results = await asyncio.gather(*tasks)
    
    return results

We run the pipeline with 3 examples.

examples = [
    "Where is Gardens of Babylon?",
    "Why did people build Great Pyramid of Giza?",
    "What does Rhodes Statue look like?",
]

Let’s run the questions sequentially first.

import time

start = time.time()
results = sequential_execution(async_rag_pipeline, examples)
end = time.time()
total_time = end - start
print(f"All tasks completed in {total_time:.2f} seconds")
All tasks completed in 8.48 seconds

Let’s check how long it takes if we run questions concurrently.

start = time.time()
results = await concurrent_execution(async_rag_pipeline, examples)
end = time.time()
total_time = end - start
print(f"All tasks completed in {total_time:.2f} seconds")
Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Batches: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00,  2.11it/s]


All tasks completed in 3.57 seconds

You can see that concurrent execution of the pipeline is more than twice as fast as the sequential execution.

Concurrent Component Execution

The example above is running the retriever components concurrently. Any components that could run concurrently, typically because they sit on parallel branches of the pipeline, are automatically scheduled to run concurrently by the AsyncPipeline’s run-logic.

Let’s create a small example with a custom component to illustrate concurrent execution in more detail.

from haystack import component

@component
class WaitingComponent:
    """
    A test component that simulates async operations by waiting for a specified time
    before returning a message.

    ### Usage example
    ```python
    test_comp = AsyncTestComponent(name="TestComponent", wait_time=2)

    # Sync usage
    result = test_comp.run(user_msg="Hello")
    print(result["message"])  # prints after 2 seconds

    # Async usage
    result = await test_comp.run_async(user_msg="Hello")
    print(result["message"])  # prints after 2 seconds
    ```
    """

    def __init__(self, name: str, wait_time: int = 1):
        self.name = name
        self.wait_time = wait_time

    @component.output_types(message=str)
    def run(self, user_msg: str) -> dict:
        """
        Synchronous method that waits for the specified time and returns a message.

        :param user_msg: Input message from the user (unused in output but required for example)
        :return: Dictionary containing the output message
        """
        print(f"Component {self.name} starts running...")
        time.sleep(self.wait_time)
        print(f"Component {self.name} is done!")
        return {"message": f"Message from {self.name}"}


wait_1 = WaitingComponent(name="wait_1", wait_time=1)
wait_2 = WaitingComponent(name="wait_2", wait_time=2)
wait_3 = WaitingComponent(name="wait_3", wait_time=3)
wait_4 = WaitingComponent(name="wait_4", wait_time=4)
wait_5 = WaitingComponent(name="wait_5", wait_time=5)

pp = AsyncPipeline()


pp.add_component("wait_1", wait_1)
pp.add_component("wait_2", wait_2)
pp.add_component("wait_3", wait_3)
pp.add_component("wait_4", wait_4)
pp.add_component("wait_5", wait_5)

pp.connect("wait_1", "wait_2")
pp.connect("wait_3", "wait_4")

pp.show()

You can see that this pipeline has 3 parallel branches. Let’s run this pipeline to see how it executes components concurrently.

async for output in pp.run_async_generator({"user_msg": "Hello"}, include_outputs_from=["wait_1", "wait_2", "wait_3", "wait_4", "wait_10"]):
    if len(output.keys()) == 1:
        print(output)
{'wait_1': {'message': 'Message from wait_1'}}
{'wait_3': {'message': 'Message from wait_3'}}
{'wait_2': {'message': 'Message from wait_2'}}
{'wait_5': {'message': 'Message from wait_5'}}
{'wait_4': {'message': 'Message from wait_4'}}

Custom Asynchronous Components

Individual components can opt into async by implementing a run_async coroutine that has the same signature, i.e., input parameters and outputs as the run method. This constraint is placed to ensure that pipeline connections are the same irrespective of whether a component supports async execution, allowing for plug-n-play backward compatibility with existing pipelines.

from typing import Dict, Any
from haystack import component

@component
class MyCustomComponent:
    def __init__(self, my_custom_param: str):
        self.my_custom_param = my_custom_param

    @component.output_types(original=str, concatenated=str)
    def run(self, input: str) -> Dict[str, Any]:
        return {
            "original": input,
            "concatenated": input + self.my_custom_param
        }

    async def do_io_bound_op(self, input: str) -> str:
        # Do some IO-bound operation here
        return input + self.my_custom_param

    @component.output_types(original=str, concatenated=str)
    async def run_async(self, input: str) -> Dict[str, Any]:
        return {
            "original": input,
            "concatenated": await self.do_io_bound_op(input)
        }