Flexibility powered by DAGs
In order to build modern search pipelines, you need two things: powerful building blocks and an easy way to stick them together.
Pipeline class is exactly built for this purpose and enables many search scenarios beyond QA.
The core idea is that you can build a Directed Acyclic Graph (DAG) where each node is one building block (Reader, Retriever, Generator ...).
Here's a simple example for a standard Open-Domain QA Pipeline:
from haystack import Pipelinep = Pipeline()p.add_node(component=retriever, name="ESRetriever1", inputs=["Query"])p.add_node(component=reader, name="QAReader", inputs=["ESRetriever1"])res = p.run(query="What did Einstein work on?", top_k_retriever=1)
You can draw the DAG to better inspect what you are building:
Whatever keyword arguments are passed into the
Pipeline.run() method will be passed on to each node in the pipeline.
For example, in the code snippet below, all nodes will receive
top_k_reader as argument,
even if they don't use those arguments. It is therefore very important when defining custom nodes that their
keyword argument names do not clash with the other nodes in your pipeline.
res = pipeline.run(query="What did Einstein work on?",top_k_retriever=1,top_k_reader=5)
YAML File Definitions
For your convenience, there is also the option of defining and loading pipelines in YAML files. Having your pipeline available in a YAML is particularly useful when you move between experimentation and production environments. Just export the YAML from your notebook / IDE and import it into your production environment. It also helps with version control of pipelines, allows you to share your pipeline easily with colleagues, and simplifies the configuration of pipeline parameters in production.
For example, you can define and save a simple Retriever Reader pipeline by saving the following to a file:
version: "0.7"components: # define all the building-blocks for Pipeline- name: MyReader # custom-name for the component; helpful for visualization & debuggingtype: FARMReader # Haystack Class name for the componentparams:no_ans_boost: -10model_name_or_path: deepset/roberta-base-squad2- name: MyESRetrievertype: ElasticsearchRetrieverparams:document_store: MyDocumentStore # params can reference other components defined in the YAMLcustom_query: null- name: MyDocumentStoretype: ElasticsearchDocumentStoreparams:index: haystack_testpipelines: # multiple Pipelines can be defined using the components from above- name: my_query_pipeline # a simple extractive-qa Pipelinenodes:- name: MyESRetrieverinputs: [Query]- name: MyReaderinputs: [MyESRetriever]
To load, simply call:
For another example YAML config, check out this file.
You can now also use multiple Retrievers and join their results:
from haystack import Pipelinep = Pipeline()p.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"])p.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["Query"])p.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults", inputs=["ESRetriever", "DPRRetriever"])p.add_node(component=reader, name="QAReader", inputs=["JoinResults"])res = p.run(query="What did Einstein work on?", top_k_retriever=1)
You can easily build your own custom nodes. Just respect the following requirements:
- Add a method
run(self, **kwargs)to your class.
**kwargswill contain the output from the previous node in your graph.
- Do whatever you want within
run()(e.g. reformatting the query)
- Return a tuple that contains your output data (for the next node) and the name of the outgoing edge
- Add a class attribute
outgoing_edges = 1that defines the number of output options from your node. You only need a higher number here if you have a decision node (see below).
Or you can add decision nodes where only one "branch" is executed afterwards. This allows, for example, to classify an incoming query and depending on the result routing it to different modules:
class QueryClassifier():outgoing_edges = 2def run(self, **kwargs):if "?" in kwargs["query"]:return (kwargs, "output_1")else:return (kwargs, "output_2")pipe = Pipeline()pipe.add_node(component=QueryClassifier(), name="QueryClassifier", inputs=["Query"])pipe.add_node(component=es_retriever, name="ESRetriever", inputs=["QueryClassifier.output_1"])pipe.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["QueryClassifier.output_2"])pipe.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults",inputs=["ESRetriever", "DPRRetriever"])pipe.add_node(component=reader, name="QAReader", inputs=["JoinResults"])res = p.run(query="What did Einstein work on?", top_k_retriever=1)
There are nodes in Haystack that are used to evaluate the performance of readers, retrievers and combine systems. To get hands on with this kind of node, have a look at the evaluation tutorial.
Default Pipelines (replacing the "Finder")
Last but not least, we added some "Default Pipelines" that allow you to run standard patterns with very few lines of code.
This is replacing the
Finder class which is now deprecated.
from haystack.pipeline import DocumentSearchPipeline, ExtractiveQAPipeline, Pipeline, JoinDocuments# Extractive QAqa_pipe = ExtractiveQAPipeline(reader=reader, retriever=retriever)res = qa_pipe.run(query="When was Kant born?", top_k_retriever=3, top_k_reader=5)# Document Searchdoc_pipe = DocumentSearchPipeline(retriever=retriever)res = doc_pipe.run(query="Physics Einstein", top_k_retriever=1)# Generative QAdoc_pipe = GenerativeQAPipeline(generator=rag_generator, retriever=retriever)res = doc_pipe.run(query="Physics Einstein", top_k_retriever=1)# FAQ based QAdoc_pipe = FAQPipeline(retriever=retriever)res = doc_pipe.run(query="How can I change my address?", top_k_retriever=3)
See also the Pipelines API documentation for more details.
We plan many more features around the new pipelines incl. parallelized execution, distributed execution, dry runs - so stay tuned ...