DocumentationAPI ReferenceπŸ““ TutorialsπŸ§‘β€πŸ³ Cookbook🀝 IntegrationsπŸ’œ Discord

Serializing Pipelines

Save your pipelines into a custom format and explore the serialization options.

Serialization means converting a pipeline to a format that you can save on your disk and load later.

πŸ“˜

Serialization formats

Haystack 2.0 only supports YAML format at this time. We will be rolling out more formats gradually.

Converting a Pipeline to YAML

Use the dumps() method to convert a Pipeline object to YAML:

from haystack import Pipeline

pipe = Pipeline()
print(pipe.dumps())

# Prints:
#
# components: {}
# connections: []
# max_loops_allowed: 100
# metadata: {}

Converting a Pipeline YAML to Python

You can convert a YAML pipeline back into Python. Use the loads() method to convert a valid YAML representation of a pipeline into a corresponding Python object:

from haystack import Pipeline

from haystack.components.preprocessors import DocumentCleaner
from haystack.components.converters import TextFileToDocument

# This is the YAML you want to convert to Python:
pipeline_yaml = """
components:
  cleaner:
    init_parameters:
      remove_empty_lines: true
      remove_extra_whitespaces: true
      remove_regex: null
      remove_repeated_substrings: false
      remove_substrings: null
    type: DocumentCleaner
  converter:
    init_parameters:
      encoding: utf-8
      numeric_row_threshold: 0.4
      progress_bar: true
      remove_numeric_tables: false
      valid_languages: []
    type: TextFileToDocument
connections:
- receiver: cleaner.documents
  sender: converter.documents
max_loops_allowed: 100
metadata: {}
"""

pipe = Pipeline.loads(pipeline_yaml)

Performing Custom Serialization

Pipelines and components in Haystack can serialize simple components, including custom ones, out of the box. Code like this just works:

from haystack import component

@component
class RepeatWordComponent:
    def __init__(self, times: int):
        self.times = times

    @component.output_types(result=str)
    def run(self, word: str):
        return word * self.times

On the other hand, this code doesn't work if the final format is JSON, as the set type is not JSON-serializable:

from haystack import component

@component
class SetIntersector:
    def __init__(self, intersect_with: set):
        self.intersect_with = intersect_with

    @component.output_types(result=set)
    def run(self, data: set):
        return data.intersection(self.intersect_with)

In such cases, you can provide your own implementation from_dict and to_dict to components:

from haystack import component, default_from_dict, default_to_dict

class SetIntersector:
		def __init__(self, intersect_with: set):
	      self.intersect_with = intersect_with
		
    @component.output_types(result=set)
	  def run(self, data: set):
        return data.intersect(self.intersect_with)

    def to_dict(self):
        return default_to_dict(self, intersect_with=list(self.intersect_with))

    @classmethod
    def from_dict(cls, data):
        # convert the set into a list for the dict representation,
        # so it can be converted to JSON
        data["intersect_with"] = set(data["intersect_with"])
        return default_from_dict(cls, data)

Saving a Pipeline to a Custom Format

Once a pipeline is available in its dictionary format, the last step of serialization is to convert that dictionary into a format you can store or send over the wire. Haystack supports YAML out of the box, but if you need a different format, you can write a custom Marshaller.

A Marshaller is a Python class responsible for converting text to a dictionary and a dictionary to text according to a certain format. Marshallers must respect the Marshaller protocol, providing the methods marshal and unmarshal.

This is the code for a custom TOML marshaller that relies on the rtoml library:

# This code requires a `pip install rtoml`
from typing import Dict, Any, Union
import rtoml

class TomlMarshaller:
    def marshal(self, dict_: Dict[str, Any]) -> str:
        return rtoml.dumps(dict_)

    def unmarshal(self, data_: Union[str, bytes]) -> Dict[str, Any]:
        return dict(rtoml.loads(data_))

You can then pass a Marshaller instance to the methods dump, dumps, load, and loads:

from haystack import Pipeline
from my_custom_marshallers import TomlMarshaller

pipe = Pipeline()
pipe.dumps(TomlMarshaller())
# prints:
# 'max_loops_allowed = 100\nconnections = []\n\n[metadata]\n\n[components]\n'