Evaluating RAG Pipelines with EvaluationHarness


In this notebook, you’ll learn how to use the EvaluationHarness from the haystack-experimental repository to assess the performance of Retrieval-Augmented Generation (RAG) pipelines over the SQUAD dataset. Learn more about haystack-experimental in Experimental Package.

The EvaluationHarness acts as an evaluation orchestrator, streamlining the assessment of pipeline performance and making the evaluation process simpler and more efficient.

Setup Development Environment

To start, install haystack-ai, haystack-experimental and other dependencies:

%%bash

pip install -U haystack-ai
pip install -U haystack-experimental==0.1.1
pip install datasets
pip install sentence-transformers

Provide an OpenAI API key to ensure that LLM-based evaluators can query the OpenAI API:

import os
from getpass import getpass

if "OPENAI_API_KEY" not in os.environ:
  os.environ["OPENAI_API_KEY"] = getpass("Enter OpenAI API key:")

# If you're running this notebook on Google Colab, you might need to the following instead:
#
# from google.colab import userdata
# if "OPENAI_API_KEY" not in os.environ:
#  os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')

Add the imports. All the imports that we’ll need to create the following:

  • An indexing pipeline that stores documents from our chosen dataset in a document store.
  • A retrieval pipeline that uses a query to retrieve relevant documents from the document store.
import json
from typing import List, Dict
from collections import defaultdict
from pathlib import Path
import random
from datasets import load_dataset, Dataset
from tqdm import tqdm

from haystack import Document, Pipeline
from haystack.components.builders import AnswerBuilder, PromptBuilder
from haystack.components.embedders import (
    SentenceTransformersDocumentEmbedder,
    SentenceTransformersTextEmbedder,
)
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers import (
    InMemoryEmbeddingRetriever,
    InMemoryBM25Retriever,
)
from haystack.components.writers import DocumentWriter

from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy
from haystack_experimental.evaluation.harness.rag import (
    DefaultRAGArchitecture,
    RAGEvaluationHarness,
    RAGEvaluationMetric,
    RAGEvaluationInput,
    RAGEvaluationOverrides,
)

Dataset preparation

The following steps will load the SQUAD dataset, preprocess them for the indexing pipeline and store them to a local folder in the current working directory.

# Helper functions to load the SQUAD dataset.
def aggregate_wiki_title(data: Dataset, agg_wiki_title: Dict[str, Dict[str, List[str]]]):
    for idx, x in enumerate(data.iter(batch_size=1)):
        if x["context"] not in agg_wiki_title[x["title"][0]]["context"]:
            agg_wiki_title[x["title"][0]]["context"].append(x["context"])
        agg_wiki_title[x["title"][0]]["question_answers"].append(
            {"question": x["question"], "answers": x["answers"]}
        )

def load_transformed_squad():
    with open("transformed_squad/questions.jsonl", "r") as f:
        questions = [json.loads(x) for x in f.readlines()]
    for idx, question in enumerate(questions):
        question["query_id"] = f"query_{idx}"

    def create_document(text: str, name: str):
        return Document(content=text, meta={"name": name})

    # walk through the files in the directory and transform each text file into a Document
    documents = []
    for root, dirs, files in os.walk("transformed_squad/articles/"):
        for article in files:
            with open(f"{root}/{article}", "r") as f:
                raw_texts = f.read().split("\n")
                for text in raw_texts:
                    documents.append(
                        create_document(text, article.replace(".txt", ""))
                    )

    return questions, documents
data_train = load_dataset("squad", split="train")
data_validation = load_dataset("squad", split="validation")
agg_wiki_title = defaultdict(
    lambda: {"context": [], "question_answers": [], "text": ""}
)
aggregate_wiki_title(data_train, agg_wiki_title)
aggregate_wiki_title(data_validation, agg_wiki_title)

# merge the context into a single document
for article in tqdm(agg_wiki_title.keys()):
    agg_wiki_title[article]["text"] = "\n".join(
        [x[0] for x in agg_wiki_title[article]["context"]]
    )

# create documents
for article in agg_wiki_title.keys():
    out_path = Path("transformed_squad/articles/")
    out_path.mkdir(parents=True, exist_ok=True)
    with open(f"{str(out_path)}/{article}.txt", "w") as f:
        f.write(agg_wiki_title[article]["text"])

# create question/answers
questions = Path("transformed_squad/")
questions.mkdir(parents=True, exist_ok=True)
with open(f"{str(questions)}/questions.jsonl", "w") as f:
    for article in agg_wiki_title.keys():
        for entry in agg_wiki_title[article]["question_answers"]:
            f.write(
                json.dumps(
                    {
                        "question": entry["question"][0],
                        "document_name": article,
                        "answers": entry["answers"][0],
                    }
                )
                + "\n"
            )

questions, documents = load_transformed_squad()

After processing the SQUAD dataset, each datapoint in questions will include question, document_name, answers and query_id fields.

Examples from questions:

[{'question': 'To whom did the Virgin Mary allegedly appear in 1858 in Lourdes France?',
  'document_name': 'University_of_Notre_Dame',
  'answers': {'text': ['Saint Bernadette Soubirous'], 'answer_start': [515]},
  'query_id': 'query_0'},
 {'question': 'What is in front of the Notre Dame Main Building?',
  'document_name': 'University_of_Notre_Dame',
  'answers': {'text': ['a copper statue of Christ'], 'answer_start': [188]},
  'query_id': 'query_1'}
 ...  
]

Example from documents:

Document(id=8c7ed44d52b4bdafc7990688a0ca63c3c8765a87362888349aedf67cb926f648,
  content: 'Hoover began using wiretapping in the 1920s during Prohibition to arrest bootleggers. In the 1927 case Olmstead v. United States, in which a bootlegger was caught through telephone tapping, the United States Supreme Court ruled that FBI wiretaps did not violate the Fourth Amendment as unlawful search and seizure, as long as the FBI did not break into a person's home to complete the tapping. After Prohibition's repeal, Congress passed the Communications Act of 1934, which outlawed non-consensual phone tapping, but allowed bugging. In the 1939 case Nardone v. United States, the court ruled that due to the 1934 law, evidence the FBI obtained by phone tapping was inadmissible in court. After the 1967 case Katz v. United States overturned the 1927 case that had allowed bugging, Congress passed the Omnibus Crime Control Act, allowing public authorities to tap telephones during investigations as long as they obtain a warrant beforehand.',
  meta: {'name': 'Federal_Bureau_of_Investigation'})

Indexing Pipeline

Create a pipeline to ingest your data into InMemoryDocumentStore. Before you index them, you will create embeddings with sentence-transformers/all-MiniLM-L6-v2. For detailed instructions on building pipelines, refer to the Docs: Creating Pipelines.

document_store = InMemoryDocumentStore()

doc_writer = DocumentWriter(
    document_store=document_store, policy=DuplicatePolicy.SKIP
)
doc_embedder = SentenceTransformersDocumentEmbedder(
    model="sentence-transformers/all-MiniLM-L6-v2"
)

indexing_pipe = Pipeline()
indexing_pipe.add_component(instance=doc_embedder, name="doc_embedder")
indexing_pipe.add_component(instance=doc_writer, name="doc_writer")

indexing_pipe.connect("doc_embedder.documents", "doc_writer.documents")

Run the indexing_pipe with subset of documents to speed up the process. This step takes around 2-3 minutes on CPU.

documents = random.sample(documents, 1000)
indexing_pipe.run({"doc_embedder": {"documents": documents}})

RAG Pipeline

Create a helper function to build an retrieval augmented generation pipeline with embedding-based retrieval. This function will take document_store and top_k as parameters.

Learn the details of building a RAG pipeline in 📚 Tutorial: Creating Your First QA Pipeline with Retrieval-Augmentation.

def build_emb_rag_pipeline(document_store: InMemoryDocumentStore, top_k: int = 2) -> Pipeline:
    template = """
        You have to answer the following question based on the given context information only.

        Context:
        {% for document in documents %}
            {{ document.content }}
        {% endfor %}

        Question: {{question}}
        Answer:
        """

    pipeline = Pipeline()
    pipeline.add_component(
        "query_embedder",
        SentenceTransformersTextEmbedder(
            model="sentence-transformers/all-MiniLM-L6-v2",
            progress_bar=False,
        ),
    )
    pipeline.add_component(
        "retriever", InMemoryEmbeddingRetriever(document_store, top_k=top_k)
    )
    pipeline.add_component("prompt_builder", PromptBuilder(template=template))
    pipeline.add_component(
        "generator", OpenAIGenerator(model="gpt-3.5-turbo")
    )
    pipeline.add_component("answer_builder", AnswerBuilder())

    pipeline.connect("query_embedder", "retriever.query_embedding")
    pipeline.connect("retriever", "prompt_builder.documents")
    pipeline.connect("prompt_builder", "generator")
    pipeline.connect("generator.replies", "answer_builder.replies")
    pipeline.connect("generator.meta", "answer_builder.meta")
    pipeline.connect("retriever", "answer_builder.documents")

    return pipeline

Create your RAG pipeline with the document_store you initialized above and the top_k value of 2.

emb_rag_pipeline = build_emb_rag_pipeline(document_store, top_k=2)

EvaluationHarness

You will evaluate your RAG pipeline using the EvaluationHarness. The EvaluationHarness executes a pipeline with a given set of inputs and evaluates its outputs with an evaluation pipeline using Haystack’s built-in Evaluators. This means you don’t need to create a separate evaluation pipeline.

The RAGEvaluationHarness class, derived from the Evaluation Harness, simplifies the evaluation process specifically for RAG pipelines. It comes with a predefined set of evaluation metrics, detailed in the RAGEvaluationMetric enum, and basic RAG architecture examples, listed in the DefaultRAGArchitecture enum.

Now, create a harness to evaluate the embedding-based RAG pipeline. For evaluating the RAG pipeline mentioned above, use the DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL architecture. You will evaluate the pipeline using the DocumentMAPEvaluator, DocumentRecallEvaluator, and FaithfulnessEvaluator.

emb_eval_harness = RAGEvaluationHarness(emb_rag_pipeline,
                                        rag_components=DefaultRAGArchitecture.GENERATION_WITH_EMBEDDING_RETRIEVAL,
                                        metrics={
                                            RAGEvaluationMetric.DOCUMENT_MAP,
                                            RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT,
                                            RAGEvaluationMetric.FAITHFULNESS
                                        })

Then, initialize the inputs to the EvaluationHarness. These inputs will be automatically passed to RAG pipeline and the evaluation pipeline that the harness internally uses.

input_questions = random.sample(questions, 10)

eval_questions = [q["question"] for q in input_questions]
ground_truth_answers = [q["answers"]["text"][0] for q in input_questions]
ground_truth_documents = [
        [
            doc
            for doc in document_store.storage.values()
            if doc.meta["name"] == q["document_name"]
        ]
        for q in input_questions
    ]

eval_harness_input = RAGEvaluationInput(
    queries=eval_questions,
    ground_truth_answers=ground_truth_answers,
    ground_truth_documents=ground_truth_documents,
    rag_pipeline_inputs={
        "prompt_builder": {"question": eval_questions},
        "answer_builder": {"query": eval_questions},
    },
)

Launch an evaluation run for EvaluationHarness with the inputs above.

emb_eval_run = emb_eval_harness.run(inputs=eval_harness_input, run_name="emb_eval_run")

Alternative: EvaluationHarness for Keyword-based Retrieval

Instead of an embedding-based retrieval, you can perform keyword-based retrieval in your RAG pipeline and evaluate it with EvaluationHarness:

# Helper function to create an keyword-based RAG pipeline.
def build_keyword_rag_pipeline(document_store: InMemoryDocumentStore, top_k: int = 2) -> Pipeline:
    template = """
        You have to answer the following question based on the given context information only.

        Context:
        {% for document in documents %}
            {{ document.content }}
        {% endfor %}

        Question: {{question}}
        Answer:
        """

    pipeline = Pipeline()
    pipeline.add_component(
        "retriever", InMemoryBM25Retriever(document_store, top_k=top_k)
    )
    pipeline.add_component("prompt_builder", PromptBuilder(template=template))
    pipeline.add_component(
        "generator", OpenAIGenerator(model="gpt-3.5-turbo")
    )
    pipeline.add_component("answer_builder", AnswerBuilder())

    pipeline.connect("retriever", "prompt_builder.documents")
    pipeline.connect("prompt_builder", "generator")
    pipeline.connect("generator.replies", "answer_builder.replies")
    pipeline.connect("generator.meta", "answer_builder.meta")
    pipeline.connect("retriever", "answer_builder.documents")

    return pipeline

# Build your new RAG pipeline
keyword_rag_pipeline = build_keyword_rag_pipeline(document_store, top_k=2)

# Create a new `RAGEvaluationHarness` with the new pipeline and `DefaultRAGArchitecture.GENERATION_WITH_KEYWORD_RETRIEVAL` architecture.
keyword_eval_harness = RAGEvaluationHarness(keyword_rag_pipeline,
                                            rag_components=DefaultRAGArchitecture.GENERATION_WITH_KEYWORD_RETRIEVAL,
                                            metrics={
                                                RAGEvaluationMetric.DOCUMENT_MAP,
                                                RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT,
                                                RAGEvaluationMetric.FAITHFULNESS
                                            })

# Define another set of `RAGEvaluationInput` for the keyword-based pipeline.
keyword_eval_harness_input = RAGEvaluationInput(
    queries=eval_questions,
    ground_truth_answers=ground_truth_answers,
    ground_truth_documents=ground_truth_documents,
    rag_pipeline_inputs={
        "prompt_builder": {"question": eval_questions},
        "answer_builder": {"query": eval_questions},
    },
)

# Run EvaluationHarness with the new set of inputs
keyword_eval_run = keyword_eval_harness.run(inputs=keyword_eval_harness_input, run_name="keyword_eval_run")

Analyzing the Results

Now that the evaluation is completed, you can analyze the results:

print("Evaluation score report:")
emb_eval_run.results.score_report()

You can display your evaluation results as a pandas dataframe and get a more detailed view

print("Evaluation score dataframe:")
emb_eval_run.results.to_pandas()

Evaluating and Comparing Different Pipelines

To evaluate alternative approaches, you can initiate another evaluation run using the same inputs but with different overrides, leveraging RAGEvaluationOverrides.

Now, update the model used with OpenAIGenerator in the RAG pipeline and execute the same EvaluationHarness instance:

overrides = RAGEvaluationOverrides(rag_pipeline={
    "generator": {"model": "gpt-4-turbo"},
})
emb_eval_run_gpt4 = emb_eval_harness.run(inputs=eval_harness_input, run_name="emb_eval_run_gpt4", overrides=overrides)

Compare the results of the two evaluation runs with comparative_individual_scores_report(). The results for the new pipeline will have the emb_eval_run_gpt4_* name.

print("Comparison of the two evaluation runs:")
emb_eval_run.results.comparative_individual_scores_report(emb_eval_run_gpt4.results)

BONUS: EvaluationHarness for Custom RAG Pipelines

In the above code, we’ve primarily focused on using the DefaultRAGArchitectures of the RAGEvaluationHarness class. They provide a straightforward way of getting started with the evaluation of simple RAG pipelines which use prototypical components. The RAGEvaluationHarness can also be used to evaluate arbitrarily complex RAG pipelines. This is done by providing the harness with some extra metadata about the pipeline to be evaluated.

To use an arbitrary pipeline with the harness, the latter requires information about the following components (c.f RAGExpectedComponent):

  • Query processor - Component that processes the input query.
    • Expects one input that contains the query string.
  • Document retriever - Component that retrieves documents based on the input query.
    • Expects one output that contains the retrieved documents.
  • Response generator - Component that generates responses based on the query and the retrieved documents.
    • Expects one output that contains the LLM’s response(s).

For each of the above, the user needs to provide the following metadata (c.f RAGExpectedComponentMetadata):

  • The name of the component as seen in the pipeline.
  • A mapping of the component’s expected inputs to their corresponding input names.
  • A mapping of the component’s expected outputs to their corresponding output names.

For example, let’s consider RAGExpectedComponent.QUERY_PROCESSOR: Assume we have a RAG pipeline with an OpenAITextEmbedder component called "txt_embedder". Since the harness is responsible for passing the pipeline’s input (the query) to the OpenAITextEmbedder, it needs to know the name of the component. Furthermore, it also needs to know the name of OpenAITextEmbedder’s input through which the query should be supplied. The metadata for the above looks this:

query_processor_metadata = RAGExpectedComponentMetadata(
    name="txt_embedder",
    input_mapping={
        "query": "text"
    }
)

Similarly, for RAGExpectedComponent.DOCUMENT_RETRIEVER: Assume the RAG pipeline has an InMemoryEmbeddingRetriever component named "mem_retriever" and is connected to "txt_embedder".

document_retriever_metadata = RAGExpectedComponentMetadata(
    name="mem_retriever",
    output_mapping={
        "retrieved_documents": "documents"
    }
)

Both "query" and "retrieved_documents" are “meta” identifiers used by the harness to specify expected inputs and outputs. They are specific to each RAGExpectedComponent enum variant and are documented in their docstrings.

# Create a harness to evaluate a custom RAG pipeline.
# Commented out because the `custom_rag_pipeline` is not defined in this notebook.

# custom_eval_harness = RAGEvaluationHarness(
#     rag_pipeline=custom_rag_pipeline,
#     rag_components={
#         RAGExpectedComponent.QUERY_PROCESSOR: RAGExpectedComponentMetadata(
#             "query_embedder", input_mapping={"query": "text"}
#         ),
#         RAGExpectedComponent.DOCUMENT_RETRIEVER: RAGExpectedComponentMetadata(
#             "retriever",
#             output_mapping={"retrieved_documents": "documents"},
#         ),
#         RAGExpectedComponent.RESPONSE_GENERATOR: RAGExpectedComponentMetadata(
#             "generator", output_mapping={"replies": "replies"}
#         ),
#     },
#     metrics={
#         RAGEvaluationMetric.DOCUMENT_MAP,
#         RAGEvaluationMetric.DOCUMENT_RECALL_SINGLE_HIT,
#         RAGEvaluationMetric.FAITHFULNESS
#     })

There is no strict requirement when it comes which components can act as a query processor, a document retriever or a response generator. For instance, it’s perfecty fine if the query processor and the document retriever are the same component. In fact, this is the case when using a keyword-based retriever which directly accepts the query (as opposed to having a query embedder in front of it).