Run Tasks Concurrently Within a Custom Component


The execution logic of Haystack Pipelines is synchronous. Components, even if they belong to parallel branches, run one after the other.

This has several advantages, including ease of debugging and the ability to handle complex workflows. In many cases/applications, this execution logic works well.

Sometimes you may want to run some components concurrently. This can be useful for component that perform I/O-bound tasks, where most of the time is spent waiting for input/output operations. Consider, for example, waiting for a response from LLM API clients or database clients.

In this cookbook, we show how you can wrap multiple components into one that will run them concurrently in different threads.

Preparation

! pip install haystack-ai cohere-haystack elasticsearch-haystack
import os
from rich import print
import time
# this is only needed in Juptyer, where an asyncio event loop is already running

import nest_asyncio
nest_asyncio.apply()

Concurrent Generators

Use case: we want to send the same prompt to different generators and aggregate the results.

os.environ["OPENAI_API_KEY"]="your OpenAI API key"
os.environ["COHERE_API_KEY"]="your Cohere API key"
# an example Document to summarize

from haystack import Document


text="""
The giant panda (Ailuropoda melanoleuca), also known as the panda bear or simply panda, is a bear species endemic to China. It is characterised by its black-and-white coat and rotund body. The name "giant panda" is sometimes used to distinguish it from the red panda, a neighboring musteloid. Adult individuals average 100 to 115 kg (220 to 254 lb), and are typically 1.2 to 1.9 m (3 ft 11 in to 6 ft 3 in) long. The species is sexually dimorphic, as males are typically 10 to 20% larger. The fur is white, with black patches around the eyes, ears, legs and shoulders. A thumb is visible on the bear's forepaw, which helps in holding bamboo in place for feeding. Giant pandas have adapted larger molars and expanded temporal fossa to meet their dietary requirements.

The giant panda is exclusively found in six mountainous regions in a few provinces. It is also found in elevations of up to 3,000 m (9,800 ft). Its diet consists almost entirely of bamboo, making the bear mostly herbivorous, despite being classified in the order Carnivora. The shoot is an important energy source, as it contains starch and is 32% protein, hence pandas evolved the ability to effectively digest starch. They are solitary, only gathering in times of mating. Females rear cubs for an average of 18 to 24 months. Potential predators of sub-adult pandas include leopards. Giant pandas heavily rely on olfactory communication to communicate with one another; scent marks are used as chemical cues and on landmarks like rocks or trees. Giant pandas live long lives, with the oldest known individual dying at 38.

As a result of farming, deforestation, and other development, the giant panda has been driven out of the lowland areas where it once lived, and it is a conservation-reliant vulnerable species. A 2007 report showed 239 pandas living in captivity inside China and another 27 outside the country. Some reports also show that the number of giant pandas in the wild is on the rise. By March 2015, the wild giant panda population had increased to 1,864 individuals. In 2016, it was reclassified on the IUCN Red List from "endangered" to "vulnerable", affirming decade-long efforts to save the panda. In July 2021, Chinese authorities also reclassified the giant panda as vulnerable. The giant panda has often served as China's national symbol, appeared on Chinese Gold Panda coins since 1982 and as one of the five Fuwa mascots of the 2008 Summer Olympics held in Beijing.
"""

documents = [Document(content=text)]

Baseline: Pipeline with Generators running one after another

from haystack import Pipeline

from haystack.components.generators import OpenAIGenerator
from haystack_integrations.components.generators.cohere import CohereGenerator
from haystack.components.builders import PromptBuilder

documents = [Document(content=text)]


template = """Write a short summary of the following text.
Text:
{% for doc in documents %}
    {{ doc.content }}
{% endfor %}
"""

pipeline = Pipeline()
pipeline.add_component("prompt_builder", PromptBuilder(template))
pipeline.add_component("first_generator", OpenAIGenerator())
pipeline.add_component("second_generator", CohereGenerator())

pipeline.connect("prompt_builder", "first_generator")
pipeline.connect("prompt_builder", "second_generator")

start = time.time()
n=5
for i in range(n):
  result = pipeline.run({"prompt_builder": {"documents": documents}})
end = time.time()
print(result)
print(f"Time taken for {n} non concurrent calls: {end - start} seconds")

Optimisation: Pipeline with Generators running concurrently

We wrap multiple generators into a single component that internally will run them concurrently in different threads. To simplify the threads orchestration, we use the to_thread function from asyncio. The component itself will stay synchronous, exposing the usual run method.

ConcurrentGenerators component

  • This component expects a list of generators and names as initialization parameters.

  • The _arun method is an async method, where concurrent execution takes place. It creates a thread for each generator and waits for all threads to finish. Then the results are collected, aggregated, and returned.

  • The run method is synchronous (as expected by the Haystack pipeline) and calls the _arun method using asyncio.run.

Learn about creating custom components in our documentation.

from haystack.core.component import Component
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from typing import List, Dict, Any, Optional
from haystack import component, default_from_dict, default_to_dict
from haystack.utils.type_serialization import deserialize_type
import asyncio


@component
class ConcurrentGenerators:
    def __init__(self, generators: List[Component], names: Optional[List[str]] = None):
        self.generators = generators
        if names is None:
            names = [f"generator_{i}" for i in range(len(generators))]
        self.names = names

			  # we set the output types here so that the results are not too nested
        output_types = {k: Dict[str, Any] for k in names}
        component.set_output_types(self, **output_types)

    def warm_up(self):
        """Warm up the generators."""
        for generator in self.generators:
            if hasattr(generator, "warm_up"):
                generator.warm_up()

    async def _arun(self, **kwargs):
        """
        Asynchrounous method to run the generators concurrently.
        """

        # the generators run in separate threads
        results = await asyncio.gather(
           *[asyncio.to_thread(generator.run, **kwargs) for generator in self.generators]
        )

        organized_results = {}
        for generator_name, res_ in zip(self.names, results):
            organized_results[generator_name] = res_
        return organized_results

    def run(self, prompt: str):
        """
        Synchronous run method that can be integrated into a classic synchronous pipeline.
        """
        results = asyncio.run(self._arun(prompt=prompt))
        return {"results": results}


    def to_dict(self):
        generators = [generator.to_dict() for generator in self.generators]
        return default_to_dict(self, generators=generators, names=self.names)

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "ConcurrentGenerators":
        init_params = data.get("init_parameters", {})

        # Deserialize the generators
        generators = []
        serialized_generators = init_params["generators"]
        for serialized_generator in serialized_generators:
            generator_class = deserialize_type(serialized_generator["type"])
            generator = generator_class.from_dict(serialized_generator)
            generators.append(generator)

        data["init_parameters"]["generators"] = generators
        return default_from_dict(cls, data)

Pipeline with ConcurrentGenerators component

pipeline = Pipeline()
pipeline.add_component("prompt_builder", PromptBuilder(template))
pipeline.add_component("concurrent_generators", ConcurrentGenerators(
    generators=[OpenAIGenerator(), CohereGenerator()],
    names=["openai", "cohere"]))

pipeline.connect("prompt_builder", "concurrent_generators")

start = time.time()
n=5
for i in range(n):
  result = pipeline.run({"prompt_builder": {"documents": documents}})
end = time.time()
print(result)
print(f"Time taken for {n} concurrent calls: {end - start} seconds")

Nice! Our approach does the trick.

Concurrent Retrievers

Use case: we want to send a user query to different retrievers concurrently. For example, we could send a user query to a keyword-based retriever and a semantic one, and join both sets of results.

⚠️ Some Document Stores have Hybrid Retrievers that internally send a single batch query to the DB. This solution is generally more efficient than the following approach.

We first generate some Documents with random content and embedding. The documents are written in ElasticsearchDocumentStore.

from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore

from haystack import Document
import numpy as np
import random

contents = ["The capital of Germany is Berlin", "The capital of France is Paris", "The capital of Spain is Madrid", "The capital of Italy is Rome"]

documents =[]
for i in range(1_000):
    doc = Document(
        content=random.choice(contents)+f" {i}",
        embedding=np.random.rand(768).tolist()
    )
    documents.append(doc)


document_store = ElasticsearchDocumentStore(cloud_id="your Elastic Cloud ID",
                                            api_key="your Elastic API key")
document_store.write_documents(documents)

print(document_store.count_documents())
print(document_store.filter_documents()[:6])

Baseline: Pipeline with Retrievers running one after another

from haystack_integrations.components.retrievers.elasticsearch import ElasticsearchBM25Retriever, ElasticsearchEmbeddingRetriever
from haystack import Pipeline
from haystack.components.joiners import DocumentJoiner



embedding_retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)
bm25_retriever = ElasticsearchBM25Retriever(document_store=document_store)
document_joiner = DocumentJoiner(join_mode="merge")

hybrid_retrieval = Pipeline()
hybrid_retrieval.add_component("embedding_retriever", embedding_retriever)
hybrid_retrieval.add_component("bm25_retriever", bm25_retriever)
hybrid_retrieval.add_component("document_joiner", document_joiner)

hybrid_retrieval.connect("bm25_retriever", "document_joiner")
hybrid_retrieval.connect("embedding_retriever", "document_joiner")

query = "Madrid"
query_embedding = [0.1] * 768


start = time.time()
n = 100
for i in range(n):
  result = hybrid_retrieval.run(
          {"bm25_retriever": {"query": query}, "embedding_retriever": {"query_embedding": query_embedding}}
      )

end = time.time()
print(result)
print(f"Time taken for {n} non concurrent calls: {end - start} seconds")

Optimisation: Pipeline with Retrievers running concurrently

We wrap multiple retrievers into a single component that internally will run them concurrently in different threads. To simplify the threads orchestration, we use the to_thread function from asyncio. The component itself will stay synchronous, exposing the usual run method.

ConcurrentRetrievers component

  • This component expects a list of retrievers and names as initialization parameters.

  • The _arun method is an async method, where concurrent execution takes place. It creates a thread for each retriever and waits for all threads to finish. Then the results are collected, aggregated, and returned.

  • The run method is synchronous (as expected by the Haystack pipeline) and calls the _arun method using asyncio.run.

  • Note: since different types of retrievers accept different query parameters in their run method (query, query_embedding, sparse_query_embedding), inspect.signature is used to determine if the retriever accepts a specific parameter.

from haystack import Pipeline
from haystack.components.joiners import DocumentJoiner
from haystack.dataclasses import SparseEmbedding
from haystack import component, Document
from haystack.core.component import Component
from typing import List, Optional, Dict, Any
import asyncio
import inspect


@component
class ConcurrentRetrievers:
    def __init__(self, retrievers: List[Component], names: Optional[List[str]] = None):
        self.retrievers = retrievers
        if names is None:
            names = [f"retriever_{i}" for i in range(len(retrievers))]
        self.names = names

        output_types = {k: List[Document] for k in names}
        component.set_output_types(self, **output_types)

    async def _arun(self, **kwargs):
        """
        Asynchrounous method to run the retrievers concurrently.
        """

        coroutines = []
        for retriever in self.retrievers:
            retriever_params = inspect.signature(getattr(retriever, "run")).parameters
            selected_params = {"top_k": kwargs.get("top_k"), "filters": kwargs.get("filters")}
            # each retriever accepts different parameters (keyword/BM25 retriever, embedding retriever, hybrid retriever)
            for query_param in ["query", "query_embedding", "sparse_query_embedding"]:
                if query_param in retriever_params:
                    selected_params[query_param] = kwargs.get(query_param)
            # the retrievers run in separate threads
            coroutines.append(asyncio.to_thread(retriever.run, **selected_params))

        results = await asyncio.gather(*coroutines)

        organized_results = {}
        for retriever_name, res_ in zip(self.names, results):
            organized_results[retriever_name] = res_["documents"]
        return organized_results

    def run(
        self,
        query: Optional[str] = None,
        query_embedding: Optional[List[float]] = None,
        sparse_query_embedding: Optional[SparseEmbedding] = None,
        filters: Optional[Dict[str, Any]] = None,
        top_k: Optional[int] = None,
    ):
        """
        Synchronous run method that can be integrated into a classic synchronous pipeline.
        """
        results = asyncio.run(
            self._arun(
                query=query,
                query_embedding=query_embedding,
                sparse_query_embedding=sparse_query_embedding,
                filters=filters,
                top_k=top_k,
            )
        )
        return results

# serialization/deserialization methods can be implemented like
# in ConcurrentGenerators component...

Pipeline with ConcurrentRetrievers component

embedding_retriever = ElasticsearchEmbeddingRetriever(document_store=document_store)
bm25_retriever = ElasticsearchBM25Retriever(document_store=document_store)
document_joiner = DocumentJoiner(join_mode="merge")

hybrid_retrieval = Pipeline()
hybrid_retrieval.add_component(
    "concurrent_retrievers",
    ConcurrentRetrievers(
        retrievers=[embedding_retriever, bm25_retriever],
        names=["embedding_retriever", "bm25_retriever"],
    ),
)
hybrid_retrieval.add_component("document_joiner", document_joiner)

hybrid_retrieval.connect("concurrent_retrievers.embedding_retriever", "document_joiner")
hybrid_retrieval.connect("concurrent_retrievers.bm25_retriever", "document_joiner")

query = "Madrid"
query_embedding = [0.1] * 768

start = time.time()
n = 100
for i in range(n):
    result = hybrid_retrieval.run(
        {"concurrent_retrievers": {"query": query, "query_embedding": query_embedding}}
    )

end = time.time()
print(result)
print(f"Time taken for {n} concurrent calls: {end - start} seconds")

Conclusions

  • The proposed approach has proved effective for I/O bound tasks.
  • A similar approach can be implemented using multiprocessing for CPU-bound tasks, such as local ML inference.
  • When implementing components like these, special attention must be paid to input and output to make them truly usable in pipelines.

Notebook by Stefano Fiorucci