DocumentationAPI ReferenceπŸ““ TutorialsπŸ§‘β€πŸ³ Cookbook🀝 IntegrationsπŸ’œ Discord

Multiplexer

Use this component to facilitate connections in loops and other complex pipelines by letting many components send a value to the same input or distributing a single value to many components.

NameMultiplexer
Folder Path/others/
Most common Position in a PipelineFlexible: for example, at the start of the Pipeline, at the start of loops, and so on.
Mandatory Input variablesDefined at initialization
Output variablesThe same as the input

Overview

Multiplexer is a component that accepts any number of input connections and distributes the first value that it receives to all the components that are connected to its output. Multiplexer’s main role is to facilitate the connection of other components together in non-trivial Pipelines.

All the expected input connections of a Multiplexer must be of the same type that needs to be defined at initialization time. The output is of the same type as the input. Note that when used in isolation, due to its variadic nature, Multiplexer always expects a list to wrap the input value. As an example:

from haystack.components.others import Multiplexer

# an example where input and output are strings
mp = Multiplexer(str)
mp.run(value=["hello"])
>>> {"value" : "hello"}

# an example where input and output are integers
mp = Multiplexer(int)
mp.run(value=[3])
>>> {"value": 3}

This component won't handle several inputs at the same time: it always only expects one at any given time. If more than one input value is received when run is invoked, the component will raise an error:

from haystack.components.others import Multiplexer

mp = Multiplexer(int)
mp.run(value=[3, 4, 5])
>>> ValueError: Multiplexer expects only one input, but 3 were received.

mp = Multiplexer(Optional[int])
mp.run(value=[None, 4])
>>> ValueError: Multiplexer expects only one input, but 2 were received.

πŸ“˜

Multiplexer behaves differently than other Haystack components and will try to run as soon as any value is received.

Usage

Multiplexer is very flexible and covers several use cases, which we will explore further.

Distribute One Value to Many Components

As the Pipeline grows, it may happen that different components need the same value from the input, for example, query. This means that the pipeline.run() statement needs to distribute this value to many components, making the data parameter or the run() method very verbose.

For example, here is a hybrid retrieval generative Pipeline:

pipe = Pipeline()
pipe.add_component(instance=OpenAITextEmbedder(), name="query_embedder")
pipe.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
pipe.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
pipe.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
pipe.add_component(instance=TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=10), name="ranker")
pipe.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
pipe.add_component(instance=OpenAIGenerator(), name="llm")
pipe.add_component(instance=AnswerBuilder(), name="answer_builder")
pipe.connect("query_embedder", "embedding_retriever.query_embedding")
pipe.connect("embedding_retriever", "doc_joiner.documents")
pipe.connect("bm25_retriever", "doc_joiner.documents")
pipe.connect("doc_joiner", "ranker.documents")
pipe.connect("ranker", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")
pipe.connect("doc_joiner", "answer_builder.documents")
Click to see the initial Pipeline graph A visual representation of a Pipeline where one value is distributed to many components.

This Pipeline includes:

  • A BM25 Retriever,
  • An Embedder,
  • A Ranker,
  • A Prompt Builder,
  • An Answer Builder.

All of these need the query in order to operate. This means that the pipeline.run() statement will be long and repetitive:

question = "Where does Mark live?"
result = pipe.run(
    {
        "query_embedder": {"text": question},
        "bm25_retriever": {"query": question},
        "ranker": {"query": question},
        "prompt_builder": {"question": question},
        "answer_builder": {"query": question},
    }
)

In such case, a Multiplexer can be added at the start of the Pipeline to simplify the pipe.run() call significantly. We can put a Multiplexer at the top and connect all components that need the query value to it. The Pipeline code will get a bit longer:

pipe = Pipeline()

# Add a Multiplexer to the pipeline
pipe.add_component(instance=Multiplexer(str), name="multiplexer")

pipe.add_component(instance=OpenAITextEmbedder(), name="query_embedder")
pipe.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
pipe.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
pipe.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
pipe.add_component(instance=TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=10), name="ranker")
pipe.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
pipe.add_component(instance=OpenAIGenerator(), name="llm")
pipe.add_component(instance=AnswerBuilder(), name="answer_builder")

# Connect the Multiplexer to all the components that need the query
pipe.connect("multiplexer.value", "query_embedder.text")
pipe.connect("multiplexer.value", "bm25_retriever.query")
pipe.connect("multiplexer.value", "ranker.query")
pipe.connect("multiplexer.value", "prompt_builder.question")
pipe.connect("multiplexer.value", "answer_builder.query")

pipe.connect("query_embedder", "embedding_retriever.query_embedding")
pipe.connect("embedding_retriever", "doc_joiner.documents")
pipe.connect("bm25_retriever", "doc_joiner.documents")
pipe.connect("doc_joiner", "ranker.documents")
pipe.connect("ranker", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")
pipe.connect("doc_joiner", "answer_builder.documents")
Click to see a Pipeline graph with the Multiplexer A visual representation of a Pipeline where a value is given to Multiplexer component, which then distributes it to several other components.

However, the user only needs to provide the query parameter once to the Multiplexer, which will then take care of passing it along to everywhere that needs it, according to the Pipeline connections.

result = pipe.run({"multiplexer": {"value": "Where does Mark live?"}})

In this case, it is not important for the Multiplexer to accept variadic input. Instead, you can think of it as a redistributor of input values.

If you have multiple values to redistribute in this manner, you can use multiple Multiplexers.

Enabling loops

Multiplexer can be used to transform a regular input connection into a variadic one, which is often necessary to close a loop.

For instance, when building a Pipeline with an error correction loop between a Generator and a custom validation component, the multiplexer enables the validation component to ask the LLM to correct its own mistakes if the Generator provides an incorrect answer.

Click to see an example graph A visual graph of a complex looped Pipeline.

Writing code for this Pipeline is not obvious: the LLM might be getting its prompt either from a PromptBuilder or the validation component. However, the Generator has a single prompt input, which makes it impossible to connect both components.

pipe.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIGenerator())
pipe.add_component("unwrapper", PromptBuilder("{% for reply in replies %}{{ reply }} {% endfor %}"))
pipe.add_component("checker", HallucinationChecker())
pipe.add_component("correction_prompt_builder", PromptBuilder(template=correction_template))

pipe.connect("retriever", "prompt_builder")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "unwrapper.replies")
pipe.connect("unwrapper.prompt", "checker.statement")
pipe.connect("retriever", "checker.documents")
pipe.connect("checker.hallucination", "correction_prompt_builder.hallucination")
pipe.connect("checker.contraddicting_documents", "correction_prompt_builder.documents")

# This connection will fail!
pipe.connect("correction_prompt_builder", "llm")  

# >> PipelineConnectError: Cannot connect 'correction_prompt_builder.prompt' 
#    with 'llm.prompt': llm.prompt is already connected to ['prompt_builder'].

Here is where the variadic input of the Multiplexer comes to the rescue. By placing a Multiplexer on the prompt input, it's now possible to connect both the PromptBuilder and the validation node, and the Multiplexer will forward the value to the Generator in the way the Generator expects.

pipe = Pipeline(max_loops_allowed=5)

pipe.add_component("retriever", InMemoryBM25Retriever(document_store=document_store, top_k=3))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIGenerator())
pipe.add_component("unwrapper", PromptBuilder("{% for reply in replies %}{{ reply }} {% endfor %}"))
pipe.add_component("checker", HallucinationChecker())
pipe.add_component("correction_prompt_builder", PromptBuilder(template=correction_template))
pipe.add_component("multiplexer", Multiplexer(str))

pipe.connect("retriever", "prompt_builder")
pipe.connect("prompt_builder", "multiplexer")
pipe.connect("multiplexer", "llm")
pipe.connect("llm.replies", "unwrapper.replies")
pipe.connect("unwrapper.prompt", "checker.statement")
pipe.connect("retriever", "checker.documents")
pipe.connect("checker.hallucination", "correction_prompt_builder.hallucination")
pipe.connect("checker.contraddicting_documents", "correction_prompt_builder.documents")
pipe.connect("correction_prompt_builder", "multiplexer")
Click to see the resulting Pipeline graph A visual graph of a looped Pipeline where Multiplexer acts as a forwarder.

Common Pitfall

In any of these Pipelines, it is impossible for Multiplexer ever to receive more than one value at a time. However, if your Pipeline gets more complex, you have to make sure this assumption is correct, as Multiplexer will throw an exception if it receives multiple values at the same time.

For example, a pipeline like this will fail:

pipeline = Pipeline()

pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=InMemoryDocumentStore()))
pipeline.add_component("prompt_builder_a", PromptBuilder("Docs A: {{ docs }}"))
pipeline.add_component("prompt_builder_b", PromptBuilder("Docs B: {{ docs }}"))
pipeline.add_component("multiplexer", Multiplexer(str))

pipeline.connect("retriever", "prompt_builder_a")
pipeline.connect("retriever", "prompt_builder_b")
pipeline.connect("prompt_builder_a", "multiplexer")
pipeline.connect("prompt_builder_b", "multiplexer")

results = pipeline.run({
    "prompt_builder_a": {"question": "a?"},
    "prompt_builder_b": {"question": "b?"},
})

# >> ValueError: Multiplexer expects only one input, but 2 were received.
Click to see a graph of this Pipeline A visual graph of a Pipeline where Multiplexer receives multiple values and therefore will fail.

Related Links

See the parameters details in our API reference: