Haystack docs home page

Pipeline Nodes

Nodes are the core components that process and route incoming text. Some perform steps like preprocessing, retrieving or summarizing text while route queries through different branches of a Pipeline. Nodes are chained together using a Pipeline and they function like building blocks that can be easily switched out for each other. A Node takes the output of the previous Node (or Nodes) as input.

Usage

All Nodes are designed to be useable within a Pipeline. When a Node has been added to a Pipeline, calling Pipeline.run() will in turn call each Node's run() method in the predefined sequence. For more information on this, see the Pipelines page.

Alternatively, you can also call the Nodes outside of the Pipeline. See each individual Node's documentation page to learn more about its available methods.

Available Nodes

NodeClassesDescription
FileConvertersPDFToTextConverter, DocxToTextConverter, AzureConverter, ImageToTextConverter, MarkdownConverterPerforms cleaning and splitting on Documents
CrawlerCrawlerScrapes websites and returns text
PreProcessorPreProcessorPerforms cleaning and splitting on Documents
RetrieverBM25Retriever, ElasticsearchRetriever, DensePassageRetriever, TableTextRetriever, EmbeddingRetriever, TfidfRetriever, ElasticsearchFilterOnlyRetrieverLooks into a coupled Document Store and fetches Documents that are relevant to a given Query
ReaderFARMReader, TransformersReaderFinds an answer to a question by selecting a text span in the provided Documents
GeneratorRAGenerator, Seq2SeqGeneratorGenerates an answer to a question by reading through the provided documents and composing an answer word-by-word
SummarizerTransformersSummarizerCreates a shorter overview of a given Document
TranslatorTransformersTranslatorTranslate text from one language into another
RankerSentenceTransformersRankerReorders a set of Documents based on their relevance to the Query
Query ClassifierTransformersQueryClassifier, SklearnQueryClassifierDistinguishes between queries that are keywords, questions or statements and routes accordingly
Question GeneratorQuestionGeneratortakes a Document as input and generates questions which it believes can be answered by the Document
Document ClassifierTransformersDocumentClassifierPerforms classification on Documents and attaches it as metadata
Entity ExtractorEntityExtractorExtracts predefined entities out of a piece of text
Route DocumentsRouteDocumentsRoutes documents based on their content type or a metadata field
Join DocumentsJoinDocumentsTakes Documents from multiple Nodes and joins them to form one list of Documents.
Join AnswersJoinAnswersTakes Answers from two or more Reader or Generator nodes and joins them to produce a single list of Answers
Docs2AnswersDocs2AnswersConverts retrieved Documents into predicted Answers format.

Decision Nodes

You can add decision nodes where only one "branch" is executed afterwards. This allows, for example, to classify an incoming query and depending on the result routing it to different modules. To find a ready-made example of a decision node, have a look at the page about the QueryClassifier.

image

If you'd like to define our own, you'll need to create a class that looks something like this:

class QueryClassifier(BaseComponent):
outgoing_edges = 2
def run(self, query):
if "?" in query:
return {}, "output_1"
else:
return {}, "output_2"
pipe = Pipeline()
pipe.add_node(component=QueryClassifier(), name="QueryClassifier", inputs=["Query"])
pipe.add_node(component=es_retriever, name="ESRetriever", inputs=["QueryClassifier.output_1"])
pipe.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["QueryClassifier.output_2"])
pipe.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults",
inputs=["ESRetriever", "DPRRetriever"])
pipe.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})

Custom Nodes

In Haystack, you can create your own nodes and comfortably integrate them into your system. Here are the steps needed to create a custom node:

  • Create new class that inherits from BaseComponent
  • Define the number of outgoing_edges as a class attribute
  • Define a run() method that will be executed when your node is called by the Pipeline. The input arguments should consist of all "config" params that you want to allow plus the "data" arguments that you will expect as input from a previous node (e.g. documents, query, file_paths).
  • Set run() to return a tuple, where the first element is an output dict of the data you want to pass to the next node and the second is the name of the outgoing edge (usually "output_1")
  • Optional: Add any custom debug information to output["_debug"]. This will be accessible in the pipeline output when the debug mode is enabled.

Template:

from haystack.nodes.base import BaseComponent
class NodeTemplate(BaseComponent):
# If it's not a decision node, there will be only one outgoing edge
outgoing_edges = 1
def run(self, query: str, my_arg: Optional[int] = 10):
# Insert code here to manipulate the input & produce an output dictionary
...
output={
"documents": ...,
"_debug": {"anything": "you want"}
}
return output, "output_1"

Usually, your node will have one outgoing edge and thus one return value. However, it's also possible to have more than one outgoing edge, typically in a decision node. A decision node's run() method consists of a decision function that determines the path in the graph by which to send down its input. Such a function has more than one possible return value, and all of these will be named accordingly, i.e. output_1, output_2, and so forth.

Example: Creating a Custom Translation Node

Let's say that we wanted to add a special translation module to our pipeline. Instead of just translating into one predefined language, our node should be able to return a summary in any language we want (i.e., for which we have a trained model). To that end, we define a CustomTranslator class. Since there's no decision function involved, we set outgoing_edges = 1:

class CustomTranslator(BaseComponent):
outgoing_edges = 1
def __init__(some_param):
# store all init params in node's config so that we can easily save/load via YAML
self.translator = TransformersTranslator(model_name_or_path=f'Helsinki-NLP/opus-mt-en-{language}')

Within a pipeline node, the run() function is where all the action happens. Our run function receives a language argument that tells the translator which translation model to initialize:

def run(self, language='fr', **kwargs):
return self.translator.run(kwargs['documents'])

We initialize this node directly when adding it to the pipeline. As usual, we specify a name and the inputs for this node:

pipeline.add_node(component=CustomTranslator(), name='CustomTranslator', inputs=['Summarizer'])

We can now call the pipeline with any Helsinki-NLP translation model from HuggingFace with English as a source language. Pipeline arguments are simply propagated through the pipeline. This means that if we want to pass a language value to our custom node, we simply specify it in our call to the pipeline. Let's look at the French summary of a popular wizard sport:

query = "What's the history of Quidditch?'
result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 20}, "language": "fr"})
result['documents'][0].text
>>> "''Quidditch'' a obtenu son nom du marais queerditch, l'emplacement du premier jeu enregistré. le jeu a été basé sur un jeu joué par une sorcière au 11ème siècle. un snitch d'or a été introduit à la suite d'un jeu 1269 joué en kent. on pense qu'une version balai du jeu peut avoir inspiré le mouvement du jeu moderne 'harlem shuffle'"

Now, how about Ukrainian?

result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 30}, "language": "uk"})
result['documents'][0].text
>>> '" Quuiditch " отримала свою назву від дивного болота, місця першої в історії записаної гри. Гру було засновано на грі, яку грала відьма у XI столітті. Золотий стукач було введено у гру 1269 гри в кенті. Вважається, що версія мітла у грі, можливо, надихнула сучасну гру на " заплутування " move " гри'