Haystack docs home page


Flexibility powered by DAGs

To build modern search pipelines, you need two things: powerful building blocks and an easy way to stick them together. The Pipeline class is precisely built for this purpose and enables many search scenarios beyond QA. The core idea is to build a Directed Acyclic Graph (DAG) where each Node is one building block (Reader, Retriever, Generator). Here's a simple example for a standard Open-Domain QA Pipeline:

from haystack import Pipeline
p = Pipeline()
p.add_node(component=retriever, name="ESRetriever1", inputs=["Query"])
p.add_node(component=reader, name="QAReader", inputs=["ESRetriever1"])
res = p.run(query="What did Einstein work on?")

Initialize a Pipeline

To start building your custom pipeline, you’ll need to initialize an object of the base Pipeline class:

from haystack import Pipeline
pipeline = Pipeline()

By default, a new pipeline receives a root node called Query or File depending on whether it's a Query or Indexing Pipeline, as the entry point to the pipeline graph. You need to manually define how the information flows from one node to the next from that point on.

Add Nodes to a Pipeline

Use the add_node() method to add new components to the pipeline graph. You may either initialize the modules before or during the call to add_node(). When you add a node to the pipeline, give it a name and a list of inputs containing one or more items. Note how the default Query node acts as the input node to the first explicitly defined node.

pipeline.add_node(component=retriever, name='Retriever', inputs=['Query'])

Here's an example of a node with several input sources:

pipeline.add_node(component=JoinNode(), name='Joiner',
inputs=['Retriever1', 'Retriever2'])

If the predecessor node has more than one output, you’ll need to specify the output number in the inputs list. For example:

pipeline.add_node(component=Branch1(), name='Branch1',
pipeline.add_node(component=Branch2(), name='Branch2',

Under the hood, the nodes are placed in a queue and executed one by one when the run() method is invoked. The output of the last node in the queue is the output of the entire pipeline.

When you create a custom pipeline, you need to pay extra care that each node’s output is compatible with the input of the successive node in the chain. Otherwise, your system will throw an error at runtime.


Each node in a Pipeline defines the arguments the run() method accepts. The Pipeline class takes care of passing relevant arguments to the node. In addition to mandatory inputs like query, the run() accepts optional node parameters like top_k with the params argument. For instance, params={"top_k": 5} will set the top_k of all nodes as 5. To target params to a specific node, the node name can be explicitly specified as params={"Retriever": {"top_k": 5}}.

res = pipeline.run(
query="What did Einstein work on?",
params={"Retriever": {"top_k": 5}, "Reader": {"top_k": 3}}

Run a Pipeline

The run() function is the single command that triggers the execution of the entire pipeline:

query = "What's the history of Quidditch?"

Every node has its own run() method, and the pipeline run() call invokes each node, one after the other. When you run() a pipeline, all the function arguments are propagated to every node in the graph. To disambiguate, say, the top_k values of retriever and ranker, they have aliases that are automatically recognized by the respective modules. This lets you dynamically modify these parameters in each call to the pipeline:

pipeline.run(query=query, params={"retriever": {"top_k": 28}, "ranker": {"top_k": 9}})

Inspect a Pipeline

Using draw()

The pipeline.draw() method generates a sketch of your pipeline. By looking at a drawing of your pipeline, you may be able to confirm that the graph is indeed structured in the way that you intended. This is especially true for customized graphs that may branch out at some point.


Accessing Pipeline Nodes

If your custom pipeline is not working as intended, try running your nodes in isolation. You may access any pipeline node by using the get_node() method and specifying the component's name:

retriever_node = pipeline.get_node('Retriever')

Returning Debugging Information

Nodes in a Pipeline can provide debug information that gets propagated to the final output of a Pipeline. For instance, a QueryClassifier node can provide the decision it made on a given query. A Retriever can provide the input query and arguments it received as well as the candidate documents it retrieved even if it is not the final node in your pipeline.

In Haystack, we provide the option to see the input and output of the node via a debug argument. If you are creating your own custom node, the node will have this debugging functionality as long as you inherit from BaseComponent

In all of our nodes, you can enable debugging a few different ways. You can set the debugging attribute of a node.

es_retriever = ElasticsearchRetriever(document_store)
es_retriever.debug = True

Alternatively, you can provide it as a parameter when running your pipeline.

result = pipeline.run(
query="Who is the father of Arya Stark?",
"ESRetriever": {
"debug": True
"DPRRetriever": {
"debug": True
"Reader": {
"debug": True

One final option is to provide debug as a global parameter when running your pipeline

result = p_classifier.run(
query="Who is the father of Arya Stark?",
"debug": True

When enabled, the debugging information will be stored in the _debug key of the dictionary returned by the pipeline.

result = pipeline.run(...)

You can expect a top level key that states the name of the node. Under that, you can expect an input and output key.

{'ESRetriever': {'input': {'debug': True,
'query': 'Who is the father of Arya Stark?',
'root_node': 'Query',
'top_k': 1},
'output': {'documents': [<Document: {'content': "\n===In the Riverlands===\nThe Stark army reaches the Twins, a bridge strong", ...}>]

Custom Debugging Functionality

You can also get a node to return custom debugging information. To return debug data from a Node, add a _debug key in the output dict. The value should be a dictionary. For instance,

def run(self, query: str):
if "?" in query:
return {"_debug": "The query contained a question mark", "output_1"
return {"_debug": "The query did not contain a question mark", "output_2"

This _debug gets inserted into the "global" _debug dictionary storing debug data for each Node, under the runtime key. The final output may look like:

"answers": ...,
"_debug": {
"node_a": {
"input": { ... },
"output": { ... },
"runtime": "The query contained a question mark"
"node_b": {
"input": { ... },
"output": { ... },
"runtime": "Some other debug info"

A Node in a Pipeline can access the global _debug from preceding nodes by adding a parameter called _debug in its run() method:

def run(self, query: str, _debug: dict):
debug_info = _debug["PrecedingNodeA"]

Running a Node in Isolation

To learn about how to call a single Node outside of a Pipeline, see the Nodes Usage page.

YAML File Definitions

For your convenience, there is also the option of defining and loading pipelines in YAML files. Having your pipeline available in a YAML is particularly useful when you move between experimentation and production environments. Just export the YAML from your notebook / IDE and import it into your production environment. It also helps with version control of pipelines, allows you to share your pipeline easily with colleagues, and simplifies the configuration of pipeline parameters in production.

For example, you can define and save a simple Retriever Reader pipeline by saving the following to a file:

version: "0.9"
components: # define all the building-blocks for Pipeline
- name: MyReader # custom-name for the component; helpful for visualization & debugging
type: FARMReader # Haystack Class name for the component
no_ans_boost: -10
model_name_or_path: deepset/roberta-base-squad2
- name: MyESRetriever
type: ElasticsearchRetriever
document_store: MyDocumentStore # params can reference other components defined in the YAML
custom_query: null
- name: MyDocumentStore
type: ElasticsearchDocumentStore
index: haystack_test
pipelines: # multiple Pipelines can be defined using the components from above
- name: my_query_pipeline # a simple extractive-qa Pipeline
- name: MyESRetriever
inputs: [Query]
- name: MyReader
inputs: [MyESRetriever]

The components section is used to define the initialization of your Nodes. The pipelines section defines how these Nodes are added into the pipeline. The above example is the equivalent of the following python code.

pipeline = Pipeline()
pipeline.add_node(component=MyRetriever, name="MyESRetriever", inputs=["Query"])
pipeline.add_node(component=MyReader, name='MyReader', inputs=['MyESRetriever'])

To load, simply call:


You can also define indexing pipelines via YAML. Nodes such as the PreProcessor, DocumentClassifier, EntityExtractor, FileTypeClassifier and the Converters are all compatible.

- name: indexing
- inputs:
- File
name: FileTypeClassifier
- inputs:
- FileTypeClassifier
name: PdfConverter
- inputs:
- PdfConverter
name: Preprocessor
- inputs:
- Preprocessor
name: DocumentClassifier
- inputs:
- DocumentClassifier
name: DocumentStore

For another example YAML config, check out this file.

Ready-Made Pipelines

Last but not least, we added some ready-made pipelines that allow you to run standard patterns with very few lines of code. See the ready-made pipelines page and pipelines API documentation to learn more about these.


from haystack.pipelines import DocumentSearchPipeline, ExtractiveQAPipeline
from haystack.nodes import JoinDocuments
from haystack import Pipeline
# Extractive QA
qa_pipe = ExtractiveQAPipeline(reader=reader, retriever=retriever)
res = qa_pipe.run(query="When was Kant born?", params={"retriever": {"top_k": 3}, "reader": {"top_k": 5}})
# Document Search
doc_pipe = DocumentSearchPipeline(retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", params={"retriever": {"top_k": 3}})
# Generative QA
doc_pipe = GenerativeQAPipeline(generator=rag_generator, retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", params={"retriever": {"top_k": 3}})
# FAQ based QA
doc_pipe = FAQPipeline(retriever=retriever)
res = doc_pipe.run(query="How can I change my address?", params={"retriever": {"top_k": 3}})

Example: Multiple retrievers

You can now also use multiple Retrievers and join their results:

from haystack import Pipeline
p = Pipeline()
p.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"])
p.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["Query"])
p.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults", inputs=["ESRetriever", "DPRRetriever"])
p.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})


Example: Creating a Retriever-Ranker-Summarizer Pipeline

In this example, we'll look at how to establish a custom Retriever-Ranker-Summarizer pipeline. It's useful to add a Ranker to a summarization pipeline because the output of the Summarizer depends on the order of the documents that it receives.

from haystack import Pipeline
pipeline = Pipeline()

To create new pipeline nodes, we initialize the modules first. For our use case, we need a retriever, a ranker, and a summarizer. We tell the summarizer to return a single summary per query (instead of one summary for each document), and that its length should be somewhere between ten and 300 words:

from haystack.nodes import ElasticsearchRetriever, SentenceTransformersRanker, TransformersSummarizer
retriever = ElasticsearchRetriever(document_store, top_k=10)
ranker = SentenceTransformersRanker(model_name_or_path="cross-encoder/ms-marco-MiniLM-L-12-v2", top_k=10)
summarizer = TransformersSummarizer(model_name_or_path='t5-large', min_length=10, max_length=300, generate_single_summary=True)

We add the nodes to the pipeline:

pipeline.add_node(component=retriever, name='Retriever', inputs=['Query'])
pipeline.add_node(component=ranker, name='Ranker', inputs=['Retriever'])
pipeline.add_node(component=summarizer, name='Summarizer', inputs=['Ranker'])

Let's now run our custom pipeline on the Harry Potter Wiki dataset. A typical application for this pipeline would be a situation where we want some high-level information about our corpus that is not necessarily contained within one document. We therefore retrieve multiple documents, rank them, and let the summarizer return a single summary of all the texts.

query = "What's the history of Quidditch?"
result = pipeline.run(query=query)

The pipeline returns a dictionary that contains the query, the name of the last node, and a list of documents:

>>> dict_keys(['documents', 'query', 'node_id'])

Since we requested a single summary of all the texts we inputted to the summarizer, the list of documents contains only one item. We access the summary through the text attribute:

>>> "the first record of a primitive form of Quidditch (''Kwidditch'') dates to c. 1050. the first known reference to wizards using broomsticks as a means of conveyance dates to A.D. 963. a variant of the game, Quodpot, was invented in the eighteenth century. in the middle of the 14th century it was made a protected species by the wizards council."

Distributed Pipelines with Ray

Ray (https://ray.io) is a framework for distributed computing.

Ray allows distributing a Pipeline's components across a cluster of machines. The individual components of a Pipeline can be independently scaled. For instance, an extractive QA Pipeline deployment can have three replicas of the Reader and a single replica for the Retriever. It enables efficient resource utilization by horizontally scaling Components.

To set the number of replicas, add replicas in the YAML config for the node in a pipeline:

- name: ray_query_pipeline
type: RayPipeline
- name: ESRetriever
replicas: 2 # number of replicas to create on the Ray cluster
inputs: [ Query ]

A RayPipeline can only be created with a YAML Pipeline config:

from haystack.pipelines import RayPipeline
pipeline = RayPipeline.load_from_yaml(path="my_pipelines.yaml", pipeline_name="my_query_pipeline")
pipeline.run(query="What is the capital of Germany?")

By default, RayPipelines creates an instance of RayServe locally. To connect to an existing Ray instance, set the address parameter when creating the RayPipeline instance.