Haystack docs home page

Nodes

Nodes are the core components that process incoming text. Some perform preprocessing steps like cleaning or splitting of text while others engage transformer models to retrieve documents, summarize passages or generate questions.

Nodes are chained together using a Pipeline. In Haystack, there are many Nodes that you can already start using. For example:

  • PreProcessor
  • FARMReader
  • ESRetriever
  • EvalDocuments
  • TransformersQueryClassifier
  • SentenceTransformersRanker

This is just a small selection of the Nodes available in Haystack. For more information on the individual Nodes, have a look at their respective pages in the Components section on the left.

Usage

All Nodes are designed to be useable within a Pipeline. When a Node has been added to a Pipeline, calling Pipeline.run() will in turn call each Node's run() method in the predefined sequence. For more information on this, see the Pipelines page.

Alternatively, you can also call the Nodes outside of the Pipeline. What happens during an individual run depends entirely on the given node's definition. For example, the Retriever's run() method calls run_query(), which in turn calls retrieve() and a few other methods.

retriever_node.run_query(query=query)
retriever_node.retrieve(query=query)

If you want to find out which class methods are called by a component's run() function, we recommend that you take a look at the definitions (e.g. the Retriever source code).

Custom Nodes

In Haystack, you can create your own nodes and comfortably integrate them into your system. Here are the steps needed to create a custom node:

  • Create new class that inherits from BaseComponent
  • Define the number of outgoing_edges as a class attribute
  • Pass all init params to self.set_config() to enable saving/loading via YAML
  • Define a run() method that will be executed when your node is called by the Pipeline. The input arguments should consist of all "config" params that you want to allow plus the "data" arguments that you will expect as input from a previous node (e.g. documents, query, file_paths).
  • Set run() to return a tuple, where the first element is an output dict of the data you want to pass to the next node and the second is the name of the outgoing edge (usually "output_1")
  • Optional: Add any custom debug information to output["_debug"]. This will be accessible in the pipeline output when the debug mode is enabled.

Template:

from haystack.nodes.base import BaseComponent
class NodeTemplate(BaseComponent):
# If it's not a decision node, there will be only one outgoing edge
outgoing_edges = 1
def __init__(some_param):
# store all init params in node's config so that we can easily save/load via YAML
self.set_config(some_param=some_param)
def run(self, query: str, my_arg: Optional[int] = 10):
# Insert code here to manipulate the input & produce an output dictionary
...
output={
"documents": ...,
"_debug": {"anything": "you want"}
}
return output, "output_1"

Usually, your node will have one outgoing edge and thus one return value. However, it's also possible to have more than one outgoing edge, typically in a decision node. A decision node's run() method consists of a decision function that determines the path in the graph by which to send down its input. Such a function has more than one possible return value, and all of these will be named accordingly, i.e. output_1, output_2, and so forth.

Decision Nodes

You can add decision nodes where only one "branch" is executed afterwards. This allows, for example, to classify an incoming query and depending on the result routing it to different modules. To find a ready-made example of a decision node, have a look at the page about the QueryClassifier.

image

If you'd like to define our own, you'll need to create a class that looks something like this:

class QueryClassifier(BaseComponent):
outgoing_edges = 2
def run(self, query):
if "?" in query:
return {}, "output_1"
else:
return {}, "output_2"
pipe = Pipeline()
pipe.add_node(component=QueryClassifier(), name="QueryClassifier", inputs=["Query"])
pipe.add_node(component=es_retriever, name="ESRetriever", inputs=["QueryClassifier.output_1"])
pipe.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["QueryClassifier.output_2"])
pipe.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults",
inputs=["ESRetriever", "DPRRetriever"])
pipe.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})

Evaluation nodes

In Haystack, there are Nodes used to evaluate the performance of readers, retrievers and combined systems. They include:

To get hands on with this kind of node, have a look at the evaluation tutorial.

Example: Creating a Custom Translation Node

Let's say that we wanted to add a special translation module to our pipeline. Instead of just translating into one predefined language, our node should be able to return a summary in any language we want (i.e., for which we have a trained model). To that end, we define a CustomTranslator class. Since there's no decision function involved, we set outgoing_edges = 1:

class CustomTranslator(BaseComponent):
outgoing_edges = 1
def __init__(some_param):
# store all init params in node's config so that we can easily save/load via YAML
self.set_config(some_param=some_param)
self.translator = TransformersTranslator(model_name_or_path=f'Helsinki-NLP/opus-mt-en-{language}')

Within a pipeline node, the run() function is where all the action happens. Our run function receives a language argument that tells the translator which translation model to initialize:

def run(self, language='fr', **kwargs):
return self.translator.run(kwargs['documents'])

We initialize this node directly when adding it to the pipeline. As usual, we specify a name and the inputs for this node:

pipeline.add_node(component=CustomTranslator(), name='CustomTranslator', inputs=['Summarizer'])

We can now call the pipeline with any Helsinki-NLP translation model from HuggingFace with English as a source language. Pipeline arguments are simply propagated through the pipeline. This means that if we want to pass a language value to our custom node, we simply specify it in our call to the pipeline. Let's look at the French summary of a popular wizard sport:

query = "What's the history of Quidditch?'
result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 20}, "language": "fr"})
result['documents'][0].text
>>> "''Quidditch'' a obtenu son nom du marais queerditch, l'emplacement du premier jeu enregistré. le jeu a été basé sur un jeu joué par une sorcière au 11ème siècle. un snitch d'or a été introduit à la suite d'un jeu 1269 joué en kent. on pense qu'une version balai du jeu peut avoir inspiré le mouvement du jeu moderne 'harlem shuffle'"

Now, how about Ukrainian?

result = pipeline.run(query=query, params={"retriever": {"top_k": 30}, "ranker": {"top_k": 30}, "language": "uk"})
result['documents'][0].text
>>> '" Quuiditch " отримала свою назву від дивного болота, місця першої в історії записаної гри. Гру було засновано на грі, яку грала відьма у XI столітті. Золотий стукач було введено у гру 1269 гри в кенті. Вважається, що версія мітла у грі, можливо, надихнула сучасну гру на " заплутування " move " гри'