Scaling NLP indexing pipelines with KEDA and Haystack — Part 2: The Deployment
Enabling retrieval-augmented generative question answering with LLMsMay 1, 2023
In the first part of this article series, we discussed the power of retrieval-augmented generation. We also explored how to create a Python application that converts files into searchable documents with embeddings via Haystack pipelines. However, merely having a Python program that converts files into text snippets and embeddings on a single machine is not enough for a production-ready deployment.
In this part, we will explore how to deploy an indexing consumer to Kubernetes and how to autoscale it using KEDA. This will allow us to efficiently add text and embeddings to our vector database that can power a retrieval augmented LLM search engine like this.
We will use the architecture described in the first part, which involves queuing files to be indexed on AWS SQS and consuming them in parallel using Haystack pipelines. Let’s start deploying and scaling!
Deployment on Kubernetes and scaling with KEDA
In this section, we will learn how to set up KEDA on Kubernetes and configure autoscaling to scale our consumers based on the number of pending files in Kubernetes. We will use the following tools:
- k3d for creating a local Kubernetes cluster to deploy our consumers
- KEDA for scaling the consumers after deployment
- localstack as a local AWS cloud stack, for testing our application
To communicate with our local Kubernetes cluster, we will use kubectl.
⚠️ For certain types of indexing pipelines, GPUs are required. This is particularly true when embeddings are generated on the same machine (rather than through an external inference service) or when a model is used within the pipeline. In such cases, it is necessary to have GPU pods within the Kubernetes cluster to run the model.
Installation and setup
We will need to set up a local Kubernetes cluster and deploy a list of services before we can start deploying and scaling our application.
Create a local Kubernetes cluster
To begin, create a new Kubernetes cluster named
haystack-keda-cluster using k3d.
k3d cluster create haystack-keda-cluster # check the the status via: kubectl cluster-info
Next, we will create a namespace called
indexing that we will use to deploy our services.
kubectl create namespace indexing
Install services — KEDA and LocalStack
To set up LocalStack, add the helm chart and install LocalStack in the
helm repo add localstack https://localstack.github.io/helm-charts helm install localstack localstack/localstack --namespace indexing
We will repeat the same steps with KEDA.
helm repo add kedacore https://kedacore.github.io/charts helm install keda kedacore/keda --namespace indexing
We can validate the setup by running
indexing kubectl get pods -n indexing.
Create an SQS queue and an S3 bucket
Our indexing consumers will connect to queues on LocalStack and download files from S3. Therefore, we need to create the necessary resources before deploying our application.
To create a queue and a bucket, we will use
the same shell script that we used in our development environment with Docker Compose. To run the script from within the container, pipe the script into the
kubectl exec command:
cat ./scripts/sqs_bucket_bootstrap.sh | kubectl exec -i -n indexing deployment/localstack -- /bin/bash
If we fetch the logs via
kubectl logs -f deployment/localstack -c localstack -n indexing, we should see that a queue and a bucket were created.
2023-04-22T15:19:34.166 INFO --- [ asgi_gw_1] localstack.request.aws : AWS sqs.CreateQueue => 200 2023-04-22T15:19:34.533 INFO --- [ asgi_gw_0] localstack.request.aws : AWS s3.CreateBucket => 200
Deploying the indexing consumer
Now that we have LocalStack and KEDA deployed to our Kubernetes cluster, we can start deploying our indexing consumer. The indexing consumers are deployed as Kubernetes deployments by using a deployment file
# deployment-consumer.yaml # link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/tree/main/kubernetes> kind: Deployment apiVersion: apps/v1 metadata: name: indexing-consumer labels: k8s-app: indexing-consumer spec: # we want to start with 0 replicas and scale up on demand replicas: 0 selector: matchLabels: k8s-app: indexing-consumer template: metadata: name: indexing-consumer labels: k8s-app: indexing-consumer spec: containers: - name: indexing-consumer command: ["python3", "consumer.py"] # public consumer image image: arzelaascoli/keda-haystack-consumer:latest env: # localstack configuration - name: AWS_ENDPOINT value: http://localstack:4566 - name: AWS_REGION value: eu-central-1 - name: AWS_ACCESS_KEY_ID value: test - name: AWS_SECRET_ACCESS_KEY value: test # Resource estimations # TODO: adjust these to our needs and the load we expect resources: requests: memory: 1000Mi cpu: 750m limits: memory: 2500Mi cpu: 2000m
We can apply this YAML to our namespace indexing with kubectl:
kubectl apply -f ./kubernetes/deployment-consumer.yaml --namespace indexing
To validate that consumers can successfully start and connect to the queue, we can scale up the replicas to 1, and check the running pods.
# scale deployment kubectl scale deployment indexing-consumer --namespace=indexing --replicas=1 # get pods kubectl get pods -n indexing # check logs kubectl logs -f deployment/indexing-consumer -c indexing-consumer -n indexing
The system will log that no files were found to be processed:
│ 2023-04-23 15:43:14 [info ] No files to process │ │ 2023-04-23 15:43:19 [info ] No files to process │ │ 2023-04-23 15:43:24 [info ] No files to process │ │ 2023-04-23 15:43:29 [info ] No files to process
Next, we will set up autoscaling based on the length of the SQS queue to enable autoscaling and scaling to zero if no files are pending.
Configure autoscaling based on queue length
After successfully creating all the required services to index files, we can now configure KEDA to check the queue length and scale the indexing consumers accordingly.
To set up a KEDA trigger on SQS, we need to configure authentication by creating a Kubernetes secrets object.
# secrets-localstack.yaml # link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/kubernetes/keda/secrets-localstack.yaml> apiVersion: v1 kind: Secret metadata: name: aws-secrets namespace: indexing data: AWS_ACCESS_KEY_ID: dGVzdA== # base64 encoded string for "test" AWS_SECRET_ACCESS_KEY: dGVzdA== # base64 encoded string for "test"
This secret is then mapped via a TriggerAuthentication object to KEDA, which will use credential based authentication.
# trigger-authentication.yaml # link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/kubernetes/keda/trigger-authentication.yaml> apiVersion: keda.sh/v1alpha1 kind: TriggerAuthentication metadata: name: keda-trigger-auth-aws-credentials namespace: indexing spec: secretTargetRef: - parameter: awsAccessKeyID # Required. name: aws-secrets # Required. key: AWS_ACCESS_KEY_ID # Required. - parameter: awsSecretAccessKey # Required. name: aws-secrets # Required. key: AWS_SECRET_ACCESS_KEY # Required.
The operator is now allowed to access LocalStacks resources, and we can create a scaled object with a
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: indexing-consumer-scaled-object spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: indexing-consumer # Mandatory. Must be in the same namespace as the ScaledObject pollingInterval: 30 cooldownPeriod: 300 idleReplicaCount: 0 minReplicaCount: 0 maxReplicaCount: 2 fallback: # defines a number of replicas to fallback to if a scaler is in an error state. failureThreshold: 3 replicas: 4 triggers: - type: aws-sqs-queue authenticationRef: name: keda-trigger-auth-aws-credentials metadata: # KEDA will use the value of the environment variable of the `consumer-file-ingestion` containers queueURL: http://localhost:4566/000000000000/test-queue queueLength: "10" # Should roughly equal the number of messages that can be processed in 1 minute awsRegion: "eu-central-1" awsEndpoint: "http://localstack:4566" scaleOnInFlight: "false" # Exclude in-flight messages from the queue length calculation
After applying these three YAMLs via
kubectl apply --f ./kubernetes/keda --namespace indexing, we can forward the port to allow uploading files to LocalStack.
kubectl port-forward deployment/localstack 4566:4566 -n indexing
We can use an
upload script to add a file
test.txt by running
# upload.py # link to file: <https://github.com/ArzelaAscoIi/haystack-keda-indexing/blob/main/upload.py> aws_service = AWSService(SQS_QUEUE, S3_BUCKET, LOCAL_DOWNLOAD_DIR) aws_service.upload_file(Path("./data/test.txt"))
Once the file is successfully uploaded and queued, KEDA will take care of scaling the deployment from 0 to 1 replica. Kubernetes will list an indexing-consumer pod.
NAME READY STATUS localstack-8fc647d9d-xkrsk 1/1 Running keda-operator-metrics-apiserver-7bcfdd7c9b-7pbkp 1/1 Running keda-operator-6857fbc758-xtc44 1/1 Running keda-admission-webhooks-59978445df-q85jr 1/1 Running indexing-consumer-656d98db6f-psz6q 0/1 ContainerCreating
After startup, the files will be fetched and removed from the queue, and indexed.
Conclusion and next steps
This article explains how to create a scalable application to convert text and PDF files into documents containing text and embeddings. KEDA allows for the on-demand scaling of each application. With this simple architecture, we are able to horizontally scale the creation of embeddings.
How was KEDA useful? — KEDA enabled us to scale consumers based on queue length. An alternative solution involves using horizontal pod autoscaling based on CPU usage, which would be triggered once elements are fetched from the queue. However, this approach does not allow for scaling down to zero. Since these tasks require GPUs, one idle machine that is constantly running can be expensive.
How do I deploy this without k3d? — This tutorial is applicable to any Kubernetes cluster. Simply follow the instructions provided.
What resources do I need? — When running this in production, GPU nodes are necessary for the cluster, which may require additional configuration.
Is there a simple way to deploy multiple pipelines? — In one of the next articles, I will share an article on how to use the Kubernetes Operator Framework ( Kopf), written in Python, to dynamically create these resources.