Introduction
Managing vector databases efficiently is crucial for applications that rely on similarity search, AI-powered recommendations, and large-scale data retrieval. As data sources grow, keeping vector databases updated with fresh embeddings becomes a challenge. Manually updating embeddings for new documents is inefficient and error-prone, making automation essential.
In this guide, you will explore how to build a fully automated pipeline for processing and updating a vector database using AWS Lambda and CircleCI. The solution involves extracting text from PDFs, generating embeddings with OpenAI, and storing them in Zilliz Cloud, a managed vector database. You will also set up AWS infrastructure (S3, ECR, and Lambda) and implement a CI/CD pipeline with CircleCI to automate deployment and updates.
What You Will Learn:
How to manage vector databases and automate embeddings creation
Building an AWS Lambda function to process and update embeddings
Using Docker to containerize the AWS Lambda function for efficient execution
Setting up CircleCI to automate testing and deployment
Implementing best practices for AWS IAM roles and security
By the end of this tutorial, you will have a fully automated workflow to process and update vector embeddings seamlessly.
This tutorial assumes some familiarity with Python, AWS, and Docker. You can check out the complete source code on GitHub, but this guide will walk you through the process step by step.
Prerequisites
Before you begin, ensure that you have the following requirements in place:
AWS Account: Sign up for an AWS account if you do not already have one. You will use AWS Lambda and Elastic Container Registry (ECR) for deployment.
AWS CLI Installed and Configured: Install the AWS Command Line Interface (CLI) and configure it with your AWS credentials. You can follow the AWS CLI setup guide.
Basic Knowledge of LangChain or Vector Databases:Understanding the fundamentals of LangChain and Vector Databases will help you design the architecture of the pipeline.
Familiarity with AWS Lambda and Docker: You should know the basics of AWS Lambda and Docker, as you will use them to package and deploy the application.
GitHub and CircleCI Accounts: Create accounts on GitHub and CircleCI to manage the version control and automate the CI/CD pipeline.
OpenAI API Key: To access OpenAI’s GPT models, you will need an API key. You can sign up for an API key on the OpenAI website.
Zilliz Cloud Account: Sign up for a Zilliz Cloud account to host your vector database and get a free cluster that provides the URI endpoint and Token to interact with it.
Once you have these prerequisites in place, you will be ready to set up the automated pipeline.
Setting Up the Project Structure
Before diving into implementation, you need to structure your project efficiently. A well-organized project makes development, testing, and deployment smoother, especially when dealing with cloud services and CI/CD automation.
Project Organization and Key Components
Your project will include the following key components:
├── .circleci/
│ └── config.yml
├── data/
│ └── 1706.03762v7.pdf
├── src/
│ ├── create_collection.py
│ ├── drop_collection.py
│ ├── insert_documents.py
│ └── __init__.py
├── aws_lambda/
| ├── __init__.py
│ └── lambda_function.py
├── scripts/
│ ├── build_deploy.sh
│ ├── create_roles.sh
│ ├── create_image.sh
│ └── create_lambda.sh
├── tests/
│ ├── test_collection_exists.py
│ ├── test_lambda_function.py
│ └── test_collection_mock.py
├── Dockerfile
└── pyproject.toml
Installing Dependencies with UV Package Manager
First, clone the repository containing the project code:
git clone https://github.com/benitomartin/embeddings-aws-circleci
cd embeddings-aws-circleci
Note: The repository you just cloned already contains all the necessary code snippets referenced throughout this tutorial. There's no need to recreate files from scratch. Simply verify that the contents match as you follow along. Feel free to adapt the structure and implementation to suit your own project requirements.
Next, install the dependencies using the UV Package Manager. If you do not have it installed, you can follow the installation guide
uv sync --all-extras
source .venv/bin/activate
These commands will install all the necessary dependencies for the project that are listed in the pyproject.toml
file and activate the virtual environment.
Environment Configuration
Create a .env
file in the root directory of your project and add the following environment variables:
ZILLIZ_CLOUD_URI=your-zilliz-uri
ZILLIZ_TOKEN=your-zilliz-token
COLLECTION_NAME=your-collection-name
PDF_BUCKET_NAME=your-bucket-name
OPENAI_API_KEY=your-openai-key
AWS_REGION=your-aws-region
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
AWS_ACCOUNT_ID=your-account-id
LAMBDA_ECR_REPOSITORY_NAME=your-ecr-repo-name
LAMBDA_IMAGE_NAME=your-image-name
LAMBDA_FUNCTION_NAME=your-lambda-name
ROLE_NAME=your-role-name
ROLE_POLICY_NAME=your-policy-name
Replace the placeholders with your actual values.
Creating the Vector Database Infrastructure
To efficiently store and retrieve embeddings, you need to set up a vector database. This section will guide you through configuring Zilliz Cloud (Milvus), defining a schema, and optimizing the database for fast vector searches.
Setting Up Zilliz Cloud Collection
Zilliz Cloud is a managed version of Milvus, a high-performance vector database. You will create a collection to store extracted text and corresponding vector embeddings.
In order to create a collection, you need to follow these steps:
Sign up and create a free Cluster in Zilliz Cloud.
Get the connection details:
URI: Found in the cluster settings (public endpoint).
Token: Required for authentication.
Set environment variables in your .env
file and provide a collection name:
ZILLIZ_CLOUD_URI=your-zilliz-uri
ZILLIZ_TOKEN=your-zilliz-token
COLLECTION_NAME=your-collection-name
Creating the Collection
Once you have the connection details, you can create a collection in Zilliz Cloud. The collection will store the extracted text and corresponding vector embeddings.
In the src
folder, you can create a create_collection.py
script, with several functions to define the schema and create the collection:
-
create_schema
: Defines the schema (create_schema
), which includes:-
id
: Auto-generated primary key (INT64
). -
pdf_text
: Extracted text stored as aVARCHAR
. -
my_vector
: Vector embeddings stored asFLOAT_VECTOR
(default dimension:1536
).
-
create_collection
Creates the collection in Zilliz Cloud, with the defined schema. It optimizes vector search by setting up an AUTOINDEX with COSINE similarity, ensuring efficient retrieval.
import os
from typing import Optional
from pymilvus import DataType, MilvusClient
def create_schema(dimension: int = 1536) -> MilvusClient.create_schema:
"""Define the schema for the Milvus collection."""
schema = MilvusClient.create_schema(
auto_id=True,
enable_dynamic_field=True,
)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="pdf_text", datatype=DataType.VARCHAR, max_length=65535)
schema.add_field(field_name="my_vector", datatype=DataType.FLOAT_VECTOR, dim=dimension)
return schema
def create_collection(
collection_name: Optional[str] = None,
uri: Optional[str] = None,
token: Optional[str] = None,
dimension: int = 1536,
) -> None:
"""Create a new Milvus collection with the specified parameters.
Args:
collection_name (str, optional): Name of the collection. Defaults to env var COLLECTION_NAME.
uri (str, optional): Zilliz Cloud URI. Defaults to env var ZILLIZ_CLOUD_URI.
token (str, optional): Zilliz token. Defaults to env var ZILLIZ_TOKEN.
dimension (int, optional): Vector dimension. Defaults to 1536.
"""
# Use environment variables as fallback
collection_name = collection_name or os.getenv("COLLECTION_NAME")
uri = uri or os.getenv("ZILLIZ_CLOUD_URI")
token = token or os.getenv("ZILLIZ_TOKEN")
if not all([collection_name, uri, token]):
raise ValueError("Missing required parameters: collection_name, uri, or token")
# Connect to Zilliz Cloud (Milvus)
client = MilvusClient(uri=uri, token=token)
# Create schema
schema = create_schema(dimension)
# Prepare index parameters
index_params = client.prepare_index_params()
index_params.add_index(field_name="my_vector", index_type="AUTOINDEX", metric_type="COSINE")
# Create collection
client.create_collection(collection_name=collection_name, schema=schema, index_params=index_params)
if __name__ == "__main__":
# Create collection
print("Creating collection...")
create_collection()
print("Collection created successfully.")
Once your Zilliz Cloud cluster is ready and .env
is configured, run:
uv run src/create_collection.py
This will create a collection in your Zilliz Cloud cluster. In case you need to delete the collection, you can create a drop_collection.py
script in the src
folder to drop the collection and recreate it again with the previous script.
import os
from typing import Optional
from pymilvus import MilvusClient
def drop_collection(
collection_name: Optional[str] = None,
uri: Optional[str] = None,
token: Optional[str] = None,
) -> None:
"""Drop a Milvus collection.
Args:
collection_name (str, optional): Name of the collection. Defaults to env var COLLECTION_NAME.
uri (str, optional): Zilliz Cloud URI. Defaults to env var ZILLIZ_CLOUD_URI.
token (str, optional): Zilliz token. Defaults to env var ZILLIZ_TOKEN.
"""
# Use environment variables as fallback
collection_name = collection_name or os.getenv("COLLECTION_NAME")
uri = uri or os.getenv("ZILLIZ_CLOUD_URI")
token = token or os.getenv("ZILLIZ_TOKEN")
if not all([collection_name, uri, token]):
raise ValueError("Missing required parameters: collection_name, uri, or token")
# Connect to Zilliz Cloud (Milvus)
client = MilvusClient(uri=uri, token=token)
# Drop the collection
client.drop_collection(collection_name=collection_name)
if __name__ == "__main__":
# Drop collection
print("Dropping collection...")
drop_collection()
print("Collection dropped successfully.")
To drop the collection, run:
uv run src/drop_collection.py
Implementing the PDF Processing Pipeline
To store and search text efficiently, you need to process PDFs, extract the text, convert it into embeddings, and store them in Zilliz Cloud for fast retrieval.
Make sure to set the OPENAI_API_KEY
environment variable in your .env
file.
Then, create a insert_documents.py
script in the src
folder. This script will:
Load the text from PDFs using
PyPDFLoader
from LangChain to get a Document objectSplit the text into manageable chunks to ensure accurate embeddings using
CharacterTextSplitter
Generate vector embeddings using OpenAI
Store the text and embeddings in Zilliz Cloud using
MilvusClient
for efficient similarity searches.
import os
from typing import Optional
from langchain_community.document_loaders import PyPDFLoader
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import CharacterTextSplitter
from pymilvus import MilvusClient
def process_pdf(pdf_path: str, chunk_size: int = 512, chunk_overlap: int = 100) -> list[dict]:
"""Process a PDF file and generate embeddings for its content.
Args:
pdf_path (str): Path to the PDF file.
chunk_size (int, optional): Size of text chunks. Defaults to 512.
chunk_overlap (int, optional): Overlap between chunks. Defaults to 100.
Returns:
List[dict]: List of dictionaries containing text and embeddings.
"""
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF file not found at {pdf_path}")
# Load and process PDF
loader = PyPDFLoader(pdf_path)
documents = loader.load()
# Split text
text_splitter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
chunks = text_splitter.split_documents(documents)
# Generate embeddings
openai_embeddings = OpenAIEmbeddings()
# Prepare data for insertion
data = []
for chunk in chunks:
text = chunk.page_content
embedding = openai_embeddings.embed_documents([text])[0]
data.append({"pdf_text": text, "my_vector": embedding})
return data
def insert_documents(
pdf_path: str,
collection_name: Optional[str] = None,
uri: Optional[str] = None,
token: Optional[str] = None,
chunk_size: int = 512,
chunk_overlap: int = 100,
) -> None:
"""Insert documents from a PDF file into a Milvus collection.
Args:
pdf_path (str): Path to the PDF file.
collection_name (str, optional): Name of the collection. Defaults to env var COLLECTION_NAME.
uri (str, optional): Zilliz Cloud URI. Defaults to env var ZILLIZ_CLOUD_URI.
token (str, optional): Zilliz token. Defaults to env var ZILLIZ_TOKEN.
chunk_size (int, optional): Size of text chunks. Defaults to 512.
chunk_overlap (int, optional): Overlap between chunks. Defaults to 100.
"""
# Use environment variables as fallback
collection_name = collection_name or os.getenv("COLLECTION_NAME")
uri = uri or os.getenv("ZILLIZ_CLOUD_URI")
token = token or os.getenv("ZILLIZ_TOKEN")
if not all([collection_name, uri, token]):
raise ValueError("Missing required parameters: collection_name, uri, or token")
# Connect to Zilliz Cloud (Milvus)
client = MilvusClient(uri=uri, token=token)
# Process PDF and get data
data = process_pdf(pdf_path, chunk_size, chunk_overlap)
# Insert data
client.insert(collection_name, data)
# Verify collection load state
load_state = client.get_load_state(collection_name=collection_name)
print(f"Collection load state: {load_state}")
if __name__ == "__main__":
# Insert documents
print("Inserting documents...")
insert_documents("data/1706.03762v7.pdf")
print("Documents inserted successfully.")
To run the script, use the following command. You can find a sample PDF file in the data
folder but feel free to use your own.
uv run src/insert_documents.py
This script will process the PDF, generate embeddings, and store them in your Zilliz Cloud cluster collection.
Creating IAM Roles and Policies
Now that you have a working pipeline, you need to set up AWS Lambda to trigger the pipeline when a new PDF is uploaded to an S3 bucket.
To deploy AWS Lambda functions, you need first to create specific IAM roles and permissions. You can create the following create_roles.sh
script under the scripts
folder. This script automates the process of creating an IAM role with the necessary policy AWSLambdaExecute
for AWS Lambda to execute the function and access S3.
Before running the script,make sure to set the ROLE_NAME
and AWS_REGION
environment variables in your .env
file.
AWS Lambda will assume this role when executing the function, which allows it to access the S3 bucket, as defined in the AWSLambdaExecute
policy. It will also have access to CloudWatch Logs for logging purposes, which will help you monitor and debug the function.
#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -e
# Load environment variables from .env file
set -o allexport
source .env
set +o allexport
echo "Environment variables loaded."
# Create a new IAM role with Lambda and S3 full access
echo "Checking IAM role..."
# Check if the role exists
if ! aws iam get-role --role-name ${ROLE_NAME} --region ${AWS_REGION} 2>/dev/null; then
echo "Creating new IAM role for Lambda with S3 access..."
# Fix: Remove space after = and use proper JSON formatting
ASSUME_ROLE_POLICY='{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}'
# Create the IAM role
aws iam create-role \
--role-name ${ROLE_NAME} \
--assume-role-policy-document "${ASSUME_ROLE_POLICY}" \
--region ${AWS_REGION}
# Add Lambda execution policy. Provides Put, Get access to S3 and full access to CloudWatch Logs.
aws iam attach-role-policy \
--role-name ${ROLE_NAME} \
--policy-arn arn:aws:iam::aws:policy/AWSLambdaExecute \
--region ${AWS_REGION}
echo "IAM role created and policy attached."
# Wait for role to propagate
echo "Waiting for role to propagate..."
sleep 20
else
echo "IAM role ${ROLE_NAME} already exists. Skipping role creation."
fi
To execute the script, use the following command:
uv run scripts/create_roles.sh
Building the AWS Lambda Function
The AWS Lambda function is the core component that automates the entire process of handling PDF uploads, generating embeddings, and storing them in Zilliz Cloud. The function is triggered by an S3 event, processes the uploaded PDF, and stores the resulting data in your Milvus collection.
Lambda Handler Implementation
Now you can create the lambda_function.py
file below and save it in the aws_lambda
folder. This file contains the implementation of the AWS Lambda function. In this case, the AWS Lambda function is triggered by an S3 event whenever a new PDF file is uploaded in an S3 bucket. It processes the event, extracts the file, generates embeddings, and inserts the data into the Zilliz Cloud collection.
import json
import os
import boto3
from langchain_community.document_loaders import PyPDFLoader
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import CharacterTextSplitter
from pymilvus import MilvusClient
# Global variables for reuse across invocations
client = None
openai_embeddings = None
text_splitter = None
def init_clients():
"""Initialize global clients if not already initialized"""
global client, openai_embeddings, text_splitter
if client is None:
print("Initializing Milvus client...")
client = MilvusClient(uri=os.getenv("ZILLIZ_CLOUD_URI"), token=os.getenv("ZILLIZ_TOKEN"))
if openai_embeddings is None:
print("Initializing OpenAI embeddings...")
openai_embeddings = OpenAIEmbeddings(openai_api_key=os.getenv("OPENAI_API_KEY"))
if text_splitter is None:
print("Initializing text splitter...")
text_splitter = CharacterTextSplitter(chunk_size=512, chunk_overlap=100)
def lambda_handler(event, context):
try:
print(f"Received event: {json.dumps(event)}")
# Initialize clients
init_clients()
# Validate event structure
if "Records" not in event or not event["Records"]:
print("No records found in event")
return {"statusCode": 400, "body": json.dumps("No records found in event")}
# Get bucket and file info from S3 event
record = event["Records"][0]
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
print(f"Processing file {key} from bucket {bucket}")
# Verify bucket
expected_bucket = os.getenv("PDF_BUCKET_NAME")
if bucket != expected_bucket:
print(f"Invalid bucket. Expected {expected_bucket}, got {bucket}")
return {
"statusCode": 400,
"body": json.dumps(f"Invalid bucket. Expected {expected_bucket}, got {bucket}"),
}
# Download PDF
local_path = f"/tmp/{os.path.basename(key)}"
print(f"Downloading file to {local_path}")
s3 = boto3.client("s3")
s3.download_file(bucket, key, local_path)
# Process PDF
print("Loading and splitting PDF...")
documents = PyPDFLoader(local_path).load()
chunks = text_splitter.split_documents(documents)
print(f"Split PDF into {len(chunks)} chunks")
# Prepare and insert data
print("Generating embeddings and preparing data...")
data = [
{
"pdf_text": chunk.page_content,
"my_vector": openai_embeddings.embed_documents([chunk.page_content])[0],
}
for chunk in chunks
]
print(f"Inserting {len(data)} records into collection {os.getenv('COLLECTION_NAME')}")
client.insert(os.getenv("COLLECTION_NAME"), data)
# Cleanup
os.remove(local_path)
print("Processing completed successfully")
return {"statusCode": 200, "body": json.dumps(f"Successfully processed {key}")}
except Exception as e:
print(f"Error processing document: {str(e)}")
import traceback
print(f"Traceback: {traceback.format_exc()}")
return {"statusCode": 500, "body": json.dumps(str(e))}
Main Features of the Lambda Function:
S3 Event Processing: The AWS Lambda function is triggered by an S3 event when a new PDF is uploaded to the designated bucket.
Client Initialization: The function initializes the Milvus client for storing embeddings, the OpenAI embeddings client, and the text splitter for chunking the PDF text.
Text Processing: The PDF text is extracted using PyPDFLoader, then split into smaller chunks to ensure proper embedding generation.
Generating and Storing Embeddings: The OpenAI embeddings are generated for each chunk of text, and the resulting data is inserted into the specified Milvus collection in Zilliz Cloud.
Error Handling: The function includes error handling to catch and log any exceptions that occur during the processing of the PDF.
AWS Lambda Containerization with Docker
Once the AWS Lambda function is ready, it needs to be containerized using Docker. As AWS Lambda works better with requirements.txt
instead of pyproject.toml
, you need to create a requirements.txt
file from your pyproject.toml
file in the root directory of your project with the following dependencies:
langchain-community
langchain_milvus
boto3
langchain-openai
pypdf
The Dockerfile below sets up the environment for the AWS Lambda function, including the necessary dependencies and the function code. You can save this file in the root directory of your project.
FROM public.ecr.aws/lambda/python:3.12.2025.04.01.18
# Set the working directory to /var/task
WORKDIR ${LAMBDA_TASK_ROOT}
# Copy requirements first to leverage Docker cache
COPY requirements.txt ./
# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy source code
COPY aws_lambda/lambda_function.py ./lambda_function.py
# Command to run the Lambda handler function
CMD [ "lambda_function.lambda_handler" ]
Similarly to the creation of the IAM Role, the creation of the ECR repository and the Docker image can be automated using a shell script. Make sure the coresponding environment variables are set in the .env
file. Save the script below in the scripts
folder.
#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -e
# Load environment variables from .env file
set -o allexport
source .env
set +o allexport
echo "Environment variables loaded."
# Check if the ECR repository exists, create it if it does not
if ! aws ecr describe-repositories --repository-names ${LAMBDA_ECR_REPOSITORY_NAME} --region ${AWS_REGION} 2>/dev/null; then
echo "Repository ${LAMBDA_ECR_REPOSITORY_NAME} does not exist. Creating..."
aws ecr create-repository --repository-name ${LAMBDA_ECR_REPOSITORY_NAME} --region ${AWS_REGION}
echo "Repository ${LAMBDA_ECR_REPOSITORY_NAME} created."
else
echo "Repository ${LAMBDA_ECR_REPOSITORY_NAME} already exists."
fi
# Build Docker image
# To make your image compatible with Lambda, you must use the --provenance=false option.
echo "Building Docker image ${LAMBDA_IMAGE_NAME}..."
docker buildx build --platform linux/amd64 --provenance=false -t ${LAMBDA_IMAGE_NAME}:latest .
# Authenticate Docker to your Amazon ECR registry
echo "Authenticating Docker to ECR..."
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com
# Tag the Docker image
echo "Tagging Docker image..."
docker tag ${LAMBDA_IMAGE_NAME}:latest ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${LAMBDA_ECR_REPOSITORY_NAME}:latest
# Push the Docker image to Amazon ECR
echo "Pushing Docker image to ECR..."
docker push ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${LAMBDA_ECR_REPOSITORY_NAME}:latest
echo "Docker image pushed to ECR."
echo "Image created successfully."
You can run the script with the following command:
uv run scripts/create_image.sh
Pushing the AWS Lambda Function
Once the Docker image is built and pushed to ECR, you can create the AWS Lambda function. As this function is triggered by an S3 event, you need to create an S3 bucket first to store your PDFs. This can be done through the AWS Management Console or the AWS CLI with the following command:
aws s3api create-bucket \
--bucket embeddings-$(uuidgen | tr -d - | tr '[:upper:]' '[:lower:]' ) \
--region eu-central-1 \
--create-bucket-configuration LocationConstraint=eu-central-1
This will create a new S3 bucket with a unique name as required by AWS. Make sure to update the PDF_BUCKET_NAME
environment variable in the .env
file with the name of the bucket you just created.
If your default region is us-east-1, do not include the --create-bucket-configuration flag. Instead, run:
aws s3api create-bucket \
--bucket embeddings-$(uuidgen | tr -d - | tr '[:upper:]' '[:lower:]') \
--region us-east-1
Now that the S3 bucket is created, you can create the AWS Lambda function using the following script:
#!/bin/bash
# Exit immediately if a command exits with a non-zero status
set -e
# Load environment variables from .env file
set -o allexport
source .env
set +o allexport
echo "Environment variables loaded."
# Check if the Lambda function exists
if ! aws lambda get-function --function-name ${LAMBDA_FUNCTION_NAME} --region ${AWS_REGION} 2>/dev/null; then
echo "Lambda function ${LAMBDA_FUNCTION_NAME} does not exist. Creating..."
aws lambda create-function \
--function-name ${LAMBDA_FUNCTION_NAME} \
--package-type Image \
--code ImageUri=${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${LAMBDA_ECR_REPOSITORY_NAME}:latest \
--role arn:aws:iam::${AWS_ACCOUNT_ID}:role/${ROLE_NAME} \
--region ${AWS_REGION} \
--timeout 900 \
--memory-size 3072 \
--environment "Variables={
PDF_BUCKET_NAME=${PDF_BUCKET_NAME},
OPENAI_API_KEY=${OPENAI_API_KEY},
ZILLIZ_CLOUD_URI=${ZILLIZ_CLOUD_URI},
ZILLIZ_TOKEN=${ZILLIZ_TOKEN},
COLLECTION_NAME=${COLLECTION_NAME}
}" \
else
echo "Lambda function ${LAMBDA_FUNCTION_NAME} already exists. Updating..."
aws lambda update-function-code \
--function-name ${LAMBDA_FUNCTION_NAME} \
--image-uri ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${LAMBDA_ECR_REPOSITORY_NAME}:latest
# Wait for role to propagate
echo "Waiting lambda function to update code..."
sleep 20
aws lambda update-function-configuration \
--function-name ${LAMBDA_FUNCTION_NAME} \
--timeout 900 \
--memory-size 3072 \
--environment "Variables={
PDF_BUCKET_NAME=${PDF_BUCKET_NAME},
OPENAI_API_KEY=${OPENAI_API_KEY},
ZILLIZ_CLOUD_URI=${ZILLIZ_CLOUD_URI},
ZILLIZ_TOKEN=${ZILLIZ_TOKEN},
COLLECTION_NAME=${COLLECTION_NAME}
}"
fi
# Check and add S3 trigger to Lambda if it doesn't exist
if ! aws lambda get-policy --function-name ${LAMBDA_FUNCTION_NAME} 2>/dev/null | grep -q "S3InvokeFunction"; then
echo "Adding S3 trigger permission to Lambda..."
aws lambda add-permission \
--function-name ${LAMBDA_FUNCTION_NAME} \
--statement-id S3InvokeFunction \
--action lambda:InvokeFunction \
--principal s3.amazonaws.com \
--source-arn arn:aws:s3:::${PDF_BUCKET_NAME} \
--region ${AWS_REGION}
echo "Waiting for permission to propagate..."
sleep 20
else
echo "S3 trigger permission already exists for Lambda. Skipping..."
fi
# Check and configure S3 bucket notification if it doesn't exist
CURRENT_NOTIFICATIONS=$(aws s3api get-bucket-notification-configuration --bucket ${PDF_BUCKET_NAME} 2>/dev/null)
if ! echo "${CURRENT_NOTIFICATIONS}" | grep -q "${LAMBDA_FUNCTION_NAME}"; then
echo "Configuring S3 bucket notification..."
aws s3api put-bucket-notification-configuration \
--bucket ${PDF_BUCKET_NAME} \
--notification-configuration '{
"LambdaFunctionConfigurations": [{
"LambdaFunctionArn": "arn:aws:lambda:'${AWS_REGION}':'${AWS_ACCOUNT_ID}':function:'${LAMBDA_FUNCTION_NAME}'",
"Events": ["s3:ObjectCreated:*"]
}]
}'
else
echo "S3 bucket notification already configured. Skipping..."
fi
The script checks if the AWS Lambda function already exists and creates it if it does not.
If it does not exist, it creates it with the necessary configuration, including the previously created IAM role, environment variables, and the Docker image. Additionally, it adds a permission to the AWS Lambda function to be invoked by the S3 bucket and configures the S3 bucket to trigger the AWS Lambda function when a new object is created.
If it does exist, it updates the function code and configuration.
The environment variables are loaded from the .env
file and stored in the AWS Lambda function's environment variables. This allows the AWS Lambda function to access the necessary resources and configurations.
Also to be sure the AWS Lambda function is invoked, the S3 bucket is configured to trigger the AWS Lambda function when a new object is created, like uploading a new PDF.
You can run the script with the following command:
uv run scripts/create_lambda.sh
Testing and Quality Assurance
Testing and ensuring good code quality are essential steps in any software development pipeline, particularly when deploying to cloud services such as AWS Lambda. It is important to ensure that your code works as expected and is clean, efficient, and type-safe.
Unit Testing with Pytest
Unit tests ensure that each part of the code behaves as expected. Under the tests
directory you can create test following tests files:
-
test_collection_exists.py
: Verifies that the collection exists in Zilliz Cloud before attempting to insert embeddings.
import os
import pytest
from pymilvus import MilvusClient
@pytest.fixture
def milvus_client():
# Initialize Milvus client with environment variables for URI and token
client = MilvusClient(uri=os.getenv("ZILLIZ_CLOUD_URI"), token=os.getenv("ZILLIZ_TOKEN"))
yield client
client.close() # Close the connection after the test
def test_check_collection_existence(milvus_client):
collection_name = os.getenv("COLLECTION_NAME")
# Step 1: Get list of all collections in the Milvus instance
collections = milvus_client.list_collections()
# Step 2: Assert that the collection name exists in the list of collections
assert collection_name in collections, f"Collection '{collection_name}' does not exist in Milvus."
-
test_collection_mock.py
: Uses mocks to test the collection existence and dropping functionality.
import os
from unittest.mock import MagicMock, patch
import pytest
@pytest.fixture
def mock_milvus_client():
with patch("pymilvus.MilvusClient") as mock_client:
client_instance = MagicMock()
mock_client.return_value = client_instance
yield client_instance
@pytest.fixture
def mock_env_vars():
env_vars = {
"ZILLIZ_CLOUD_URI": "fake-uri",
"COLLECTION_NAME": "test_collection",
"ZILLIZ_TOKEN": "fake-token",
}
with patch.dict(os.environ, env_vars):
yield env_vars
def test_drop_collection(mock_milvus_client, mock_env_vars):
from src.drop_collection import drop_collection
# Call drop collection
drop_collection()
# Verify the drop_collection method was called with correct parameters
mock_milvus_client.drop_collection.assert_called_once_with(
collection_name=mock_env_vars["COLLECTION_NAME"]
)
@pytest.mark.parametrize("collection_exists", [True, False])
def test_collection_existence(mock_milvus_client, mock_env_vars, collection_exists):
mock_milvus_client.list_collections.return_value = (
[mock_env_vars["COLLECTION_NAME"]] if collection_exists else []
)
# Check if collection exists
result = mock_milvus_client.list_collections()
print(f" result: {result}")
if collection_exists:
assert mock_env_vars["COLLECTION_NAME"] in result
else:
assert mock_env_vars["COLLECTION_NAME"] not in result
-
test_lambda_function.py
: Tests the AWS Lambda function locally with an actual PDF file stored in the S3 bucket.
You can upload a pdf file to your S3 bucket with the following command:
aws s3 cp your-file.pdf s3://your-bucket-name/
import os
from aws_lambda.lambda_function import lambda_handler
# Set up test event
TEST_BUCKET = os.getenv("PDF_BUCKET_NAME")
TEST_FILE = "1706.03762v7.pdf"
test_event = {
"Records": [
{
"s3": {
"bucket": {"name": TEST_BUCKET},
"object": {"key": TEST_FILE},
}
}
]
}
def test_lambda_handler():
"""Test the lambda_handler function with an actual S3 file."""
response = lambda_handler(test_event, None)
assert response["statusCode"] == 200, f"Unexpected response: {response}"
assert "Successfully processed" in response["body"]
To run the tests, you can use the following command:
uv run pytest
If everything is set up correctly, you should see the tests passing.
You will see an output similar to the following:
================================================================ test session starts
tests/test_collection_exists.py::test_check_collection_existence PASSED
tests/test_collection_mock.py::test_drop_collection PASSED
tests/test_collection_mock.py::test_collection_existence[True] result: ['test_collection']
PASSED
tests/test_collection_mock.py::test_collection_existence[False] result: []
PASSED
tests/test_lambda_function.py::test_lambda_handler Received event: {"Records": [{"s3": {"bucket": {"name": "embeddings-8213c13740654398b076090eac96473e"}, "object": {"key": "1706.03762v7.pdf"}}}]}
Initializing Milvus client...
Initializing OpenAI embeddings...
Initializing text splitter...
Processing file 1706.03762v7.pdf from bucket embeddings-8213c13740654398b076090eac96473e
Downloading file to /tmp/1706.03762v7.pdf
Loading and splitting PDF...
Split PDF into 15 chunks
Generating embeddings and preparing data...
Inserting 15 records into collection pdf_embeddings
Processing completed successfully
PASSED
================================================================ 5 passed in 16.73s =================================================================
Quality Assurance with Ruff and MyPy
Ruff and MyPy are static analysis tools that help ensure your code is clean, efficient, and type-safe.
Ruff is a linter that checks for code style and syntax errors. MyPy is a static type checker that ensures your code is type-safe.
With the following commands, you can run Ruff and MyPy to check your code:
uv run ruff check . --fix --exit-non-zero-on-fix
uv run mypy
If everything is set up correctly, you should see no errors or warnings.
Implementing CI/CD with CircleCI
Continuous Integration and Continuous Deployment (CI/CD) are essential practices for automating the testing, building, and deployment of your applications. CircleCI provides a platform to automate your development workflows, including code testing, Docker image building, and deployment to AWS Lambda.
To configure your pipeline, you’ll need a .circleci/config.yml
file in a .circleci
directory at the root of your project. This configuration file defines your jobs, workflows, and execution steps for building, testing, and deploying your Lambda function.
version: 2.1
orbs:
aws-cli: circleci/[email protected]
docker: circleci/[email protected]
jobs:
build-deploy:
docker:
- image: cimg/python:3.12
steps:
- checkout
- run:
name: Install UV
command: |
curl -LsSf https://astral.sh/uv/install.sh | sh
- run:
name: Create venv and install dependencies
command: |
uv sync --all-extras
- run:
name: Run ruff
command: |
uv run ruff check . --fix --exit-non-zero-on-fix
- run:
name: Run MyPy
command: |
uv run mypy
- run:
name: Run tests
command: |
uv run pytest
- run:
name: Create .env file
command: |
echo "ZILLIZ_CLOUD_URI=${ZILLIZ_CLOUD_URI}" > .env
echo "ZILLIZ_TOKEN=${ZILLIZ_TOKEN}" >> .env
echo "COLLECTION_NAME=${COLLECTION_NAME}" >> .env
echo "PDF_BUCKET_NAME=${PDF_BUCKET_NAME}" >> .env
echo "OPENAI_API_KEY=${OPENAI_API_KEY}" >> .env
echo "AWS_REGION=${AWS_REGION}" >> .env
echo "AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}" >> .env
echo "AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}" >> .env
echo "AWS_ACCOUNT_ID=${AWS_ACCOUNT_ID}" >> .env
echo "REPOSITORY_NAME=${REPOSITORY_NAME}" >> .env
echo "IMAGE_NAME=${IMAGE_NAME}" >> .env
echo "LAMBDA_FUNCTION_NAME=${LAMBDA_FUNCTION_NAME}" >> .env
echo "ROLE_NAME=${ROLE_NAME}" >> .env
echo "ROLE_POLICY_NAME=${ROLE_POLICY_NAME}" >> .env
- aws-cli/setup:
profile_name: default
- setup_remote_docker
- run:
name: Deploy to AWS
command: |
chmod +x scripts/build_deploy.sh
./scripts/build_deploy.sh
workflows:
version: 2
deploy:
jobs:
- build-deploy
The file can be broken down into the following components:
-
Orbs:
-
aws-cli
: The AWS CLI orb simplifies the setup of AWS CLI to interact with AWS services. -
docker
: The CircleCI Docker orb handles setting up the Docker environment.
-
-
Jobs:
-
build-deploy
: This job is responsible for building and deploying the AWS Lambda function. It includes steps for checking out the code, installing dependencies, running tests, and deploying the function to AWS Lambda. As you need to execute multiple scripts in sequence, you can use a single bash scriptbuild_deploy.sh
to do so and save it in thescripts
directory.
#!/bin/bash # Exit immediately if a command fails set -e # Define script paths SCRIPT1="scripts/create_roles.sh" SCRIPT2="scripts/create_image.sh" SCRIPT3="scripts/create_lambda.sh" # Ensure scripts are executable chmod +x $SCRIPT1 $SCRIPT2 $SCRIPT3 # Run the scripts sequentially echo "Running Script 1..." $SCRIPT1 echo "Running Script 2..." $SCRIPT2 echo "Running Script 3..." $SCRIPT3 echo "All scripts executed successfully!"
-
-
Workflows:
- The
deploy
workflow triggers thebuild-deploy
job when a push is made to the main branch.
- The
Once you have committed the configuration file, push it to your GitHub repository, and visit the CircleCI dashboard to set up your project.
Select your repository and click Set Up Project:
Next, choose the appropriate branch to trigger the first pipeline. You can select the branch you want to use for your CI/CD pipeline. In this case, you can choose the main
branch and click Set Up Project.
If this is your first time triggering a build on CircleCI for this project, note that the initial pipeline will fail.
This is expected behavior as environment variables are required for the pipeline to run successfully. CircleCI does not allow you to configure them until the project has been initialized by that first triggered build.
After the initial failure, open the Project Settings, go to the Environment Variables section, and add all the required environment variables.
Once the variables are saved, re-run the pipeline. It should now complete successfully and deploy your AWS Lambda function. From this point onward, CircleCI will retain your environment variables, and you won't need to configure then again unless you introduce new ones.
To confirm that the deployment works as expected, upload a PDF to the configured S3 bucket. The Lambda function should be automatically tiggered by the S3 event.
To upload a PDF to the S3 bucket, you can use the following command:
aws s3 cp your-file.pdf s3://your-bucket-name/
To monitor the logs of the AWS Lambda function, you can use the following command:
aws logs tail /aws/lambda/your-lambda-function --follow
If everything is set up correctly, you should see the logs of the AWS Lambda function and you can check of the PDF was processed correctly by checking your Zilliz Cloud collection.
Cleaning up
If you do not need the respurces anymore, make sure you delete them to avoid unnecessary charges.
Conclusion
In this blog, you have walked through building and automating a serverless PDF processing pipeline using AWS Lambda, Docker, and CircleCI. The automated process involves triggering AWS Lambda functions via S3 events, generating embeddings using OpenAI, and storing them in Milvus on Zilliz Cloud. The CI/CD pipeline powered by CircleCI ensures that the code is automatically tested, built into a Docker image, and deployed to AWS Lambda, streamlining the development and deployment process.
Using Docker with AWS Lambda provides a consistent environment for your AWS Lambda function, ensuring that dependencies and configurations are maintained across different stages. The CircleCI pipeline automates testing, building, and deployment, reducing manual intervention and enabling fast and reliable updates to the AWS Lambda function. These tools work together to ensure efficiency, scalability, and security.
Looking ahead, potential improvements could include enhancing the error handling and logging in the AWS Lambda function, adding more comprehensive testing coverage, saving the environment variables in AWS Secrets Manager, and introducing monitoring and alerting to track the performance of the AWS Lambda function. As the pipeline evolves, it can scale to handle more complex workflows and integrations, ensuring continued success and reliability in production.
Top comments (0)