Pipeline Nodes
Nodes are the core components that process and route incoming text. Some perform steps like preprocessing, retrieving or summarizing text while route queries through different branches of a Pipeline. Nodes are chained together using a Pipeline and they function like building blocks that can be easily switched out for each other. A Node takes the output of the previous Node (or Nodes) as input.
Usage
All Nodes are designed to be useable within a Pipeline.
When a Node has been added to a Pipeline, calling Pipeline.run()
will in turn call each Node's run()
method in the predefined sequence.
For more information on this, see the Pipelines page.
Alternatively, you can also call the Nodes outside of the Pipeline. See each individual Node's documentation page to learn more about its available methods.
Available Nodes
Node | Classes | Description |
---|---|---|
FileConverters | PDFToTextConverter, DocxToTextConverter, AzureConverter, ImageToTextConverter, MarkdownConverter | Performs cleaning and splitting on Documents |
Crawler | Crawler | Scrapes websites and returns text |
PreProcessor | PreProcessor | Performs cleaning and splitting on Documents |
Retriever | BM25Retriever, ElasticsearchRetriever, DensePassageRetriever, TableTextRetriever, EmbeddingRetriever, TfidfRetriever, ElasticsearchFilterOnlyRetriever | Looks into a coupled Document Store and fetches Documents that are relevant to a given Query |
Reader | FARMReader, TransformersReader | Finds an answer to a question by selecting a text span in the provided Documents |
Generator | RAGenerator, Seq2SeqGenerator | Generates an answer to a question by reading through the provided documents and composing an answer word-by-word |
Summarizer | TransformersSummarizer | Creates a shorter overview of a given Document |
Translator | TransformersTranslator | Translate text from one language into another |
Ranker | SentenceTransformersRanker | Reorders a set of Documents based on their relevance to the Query |
Query Classifier | TransformersQueryClassifier, SklearnQueryClassifier | Distinguishes between queries that are keywords, questions or statements and routes accordingly |
Question Generator | QuestionGenerator | takes a Document as input and generates questions which it believes can be answered by the Document |
Document Classifier | TransformersDocumentClassifier | Performs classification on Documents and attaches it as metadata |
Entity Extractor | EntityExtractor | Extracts predefined entities out of a piece of text |
Route Documents | RouteDocuments | Routes documents based on their content type or a metadata field |
Join Documents | JoinDocuments | Takes Documents from multiple Nodes and joins them to form one list of Documents. |
Join Answers | JoinAnswers | Takes Answers from two or more Reader or Generator nodes and joins them to produce a single list of Answers |
Docs2Answers | Docs2Answers | Converts retrieved Documents into predicted Answers format. |
Decision Nodes
You can add decision nodes where only one "branch" is executed afterwards.
This allows, for example, to classify an incoming query and depending on the result routing it to different modules.
To find a ready-made example of a decision node, have a look at the page about the QueryClassifier
.
If you'd like to define our own, you'll need to create a class that looks something like this:
class QueryClassifier(BaseComponent): outgoing_edges = 2
def run(self, query): if "?" in query: return {}, "output_1"
else: return {}, "output_2"
pipe = Pipeline() pipe.add_node(component=QueryClassifier(), name="QueryClassifier", inputs=["Query"]) pipe.add_node(component=es_retriever, name="ESRetriever", inputs=["QueryClassifier.output_1"]) pipe.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["QueryClassifier.output_2"]) pipe.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults", inputs=["ESRetriever", "DPRRetriever"]) pipe.add_node(component=reader, name="QAReader", inputs=["JoinResults"]) res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})
Custom Nodes
In Haystack, you can create your own nodes and comfortably integrate them into your system. Here are the steps needed to create a custom node:
- Create new class that inherits from
BaseComponent
- Define the number of
outgoing_edges
as a class attribute - Define a
run()
method that will be executed when your node is called by the Pipeline. The input arguments should consist of all "config" params that you want to allow plus the "data" arguments that you will expect as input from a previous node (e.g.documents
,query
,file_paths
). - Set
run()
to return a tuple, where the first element is an output dict of the data you want to pass to the next node and the second is the name of the outgoing edge (usually"output_1"
) - Optional: Add any custom debug information to
output["_debug"]
. This will be accessible in the pipeline output when the debug mode is enabled.
Template:
from haystack.nodes.base import BaseComponent
class NodeTemplate(BaseComponent): # If it's not a decision node, there will be only one outgoing edge outgoing_edges = 1
def run(self, query: str, my_arg: Optional[int] = 10): # Insert code here to manipulate the input & produce an output dictionary ... output={ "documents": ..., "_debug": {"anything": "you want"} } return output, "output_1"
Usually, your node will have one outgoing edge and thus one return value.
However, it's also possible to have more than one outgoing edge, typically in a decision node.
A decision node's run()
method consists of a decision function that determines the path in the graph by which to send down its input.
Such a function has more than one possible return value, and all of these will be named accordingly, i.e. output_1
, output_2
, and so forth.
Example: Creating a Custom Translation Node
Let's say that we wanted to add a special translation module to our pipeline.
Instead of just translating into one predefined language, our node should be able to return a summary in any language we want (i.e., for which we have a trained model).
To that end, we define a CustomTranslator class. Since there's no decision function involved, we set outgoing_edges = 1
:
class CustomTranslator(BaseComponent): outgoing_edges = 1
def __init__(some_param): # store all init params in node's config so that we can easily save/load via YAML self.translator = TransformersTranslator(model_name_or_path=f'Helsinki-NLP/opus-mt-en-{language}')
Within a pipeline node, the run()
function is where all the action happens. Our run function receives a language argument that tells the translator which translation model to initialize:
def run(self, language='fr', **kwargs): return self.translator.run(kwargs['documents'])
We initialize this node directly when adding it to the pipeline. As usual, we specify a name and the inputs for this node:
pipeline.add_node(component=CustomTranslator(), name='CustomTranslator', inputs=['Summarizer'])
We can now call the pipeline with any Helsinki-NLP translation model from HuggingFace with English as a source language. Pipeline arguments are simply propagated through the pipeline. This means that if we want to pass a language value to our custom node, we simply specify it in our call to the pipeline. Let's look at the French summary of a popular wizard sport:
query = "What's the history of Quidditch?'result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 20}, "language": "fr"})result['documents'][0].text
>>> "''Quidditch'' a obtenu son nom du marais queerditch, l'emplacement du premier jeu enregistré. le jeu a été basé sur un jeu joué par une sorcière au 11ème siècle. un snitch d'or a été introduit à la suite d'un jeu 1269 joué en kent. on pense qu'une version balai du jeu peut avoir inspiré le mouvement du jeu moderne 'harlem shuffle'"
Now, how about Ukrainian?
result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 30}, "language": "uk"})result['documents'][0].text
>>> '" Quuiditch " отримала свою назву від дивного болота, місця першої в історії записаної гри. Гру було засновано на грі, яку грала відьма у XI столітті. Золотий стукач було введено у гру 1269 гри в кенті. Вважається, що версія мітла у грі, можливо, надихнула сучасну гру на " заплутування " move " гри'