The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
stateDiagram-v2
direction LR
BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/>
LambdaInit: Lambda invocation
BatchProcessor: Batch Processor
RecordHandler: Record Handler function
YourLogic: Your logic to process each batch item
LambdaResponse: Lambda response
BatchSource --> LambdaInit
LambdaInit --> BatchProcessor
BatchProcessor --> RecordHandler
state BatchProcessor {
[*] --> RecordHandler: Your function
RecordHandler --> YourLogic
}
RecordHandler --> BatchProcessor: Collect results
BatchProcessor --> LambdaResponse: Report items that failed processing
When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: a) your Lambda function returns a successful response, b) record reaches maximum retry attempts, or c) when records expire.
journey
section Conditions
Successful response: 5: Success
Maximum retries: 3: Failure
Records expired: 1: Failure
This behavior changes when you enable the ReportBatchItemFailures feature in your Lambda function event source configuration:
SQS queues. Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
Kinesis data streams and DynamoDB streams. Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
Warning: This utility lowers the chance of processing records more than once; it does not guarantee it
We recommend implementing processing logic in an idempotent manner whenever possible.
You can find more details on how Lambda works with either SQS, Kinesis, or DynamoDB in the AWS Documentation.
For this feature to work, you need to (1) configure your Lambda function event source to use ReportBatchItemFailures, so that the response from the Batch Processing utility can inform the service which records failed to be processed.
Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries.
You do not need any additional IAM permissions to use this utility, except for what each event source requires.
AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Description:partial batch response sampleGlobals:Function:Timeout:5MemorySize:256Runtime:nodejs22.xTracing:ActiveEnvironment:Variables:LOG_LEVEL:INFOPOWERTOOLS_SERVICE_NAME:helloResources:HelloWorldFunction:Type:AWS::Serverless::FunctionProperties:Handler:index.handlerCodeUri:hello_worldPolicies:# Lambda Destinations require additional permissions# to send failure records to DLQ from Kinesis/DynamoDB-Version:'2012-10-17'Statement:Effect:'Allow'Action:-sqs:GetQueueAttributes-sqs:GetQueueUrl-sqs:SendMessageResource:!GetAttSampleDLQ.ArnEvents:KinesisStream:Type:KinesisProperties:Stream:!GetAttSampleStream.ArnBatchSize:100StartingPosition:LATESTMaximumRetryAttempts:2DestinationConfig:OnFailure:Destination:!GetAttSampleDLQ.ArnFunctionResponseTypes:-ReportBatchItemFailuresSampleDLQ:Type:AWS::SQS::QueueSampleStream:Type:AWS::Kinesis::StreamProperties:ShardCount:1StreamEncryption:EncryptionType:KMSKeyId:alias/aws/kinesis
AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Description:partial batch response sampleGlobals:Function:Timeout:5MemorySize:256Runtime:nodejs22.xTracing:ActiveEnvironment:Variables:POWERTOOLS_LOG_LEVEL:INFOPOWERTOOLS_SERVICE_NAME:helloResources:HelloWorldFunction:Type:AWS::Serverless::FunctionProperties:Handler:index.handlerCodeUri:hello_worldPolicies:# Lambda Destinations require additional permissions# to send failure records from Kinesis/DynamoDB-Version:'2012-10-17'Statement:Effect:'Allow'Action:-sqs:GetQueueAttributes-sqs:GetQueueUrl-sqs:SendMessageResource:!GetAttSampleDLQ.ArnEvents:DynamoDBStream:Type:DynamoDBProperties:Stream:!GetAttSampleTable.StreamArnStartingPosition:LATESTMaximumRetryAttempts:2DestinationConfig:OnFailure:Destination:!GetAttSampleDLQ.ArnFunctionResponseTypes:-ReportBatchItemFailuresSampleDLQ:Type:AWS::SQS::QueueSampleTable:Type:AWS::DynamoDB::TableProperties:BillingMode:PAY_PER_REQUESTAttributeDefinitions:-AttributeName:pkAttributeType:S-AttributeName:skAttributeType:SKeySchema:-AttributeName:pkKeyType:HASH-AttributeName:skKeyType:RANGESSESpecification:SSEEnabled:trueStreamSpecification:StreamViewType:NEW_AND_OLD_IMAGES
When using SQS FIFO queues, a batch may include messages from different group IDs.
By default, we will stop processing at the first failure and mark unprocessed messages as failed to preserve ordering. However, this behavior may not be optimal for customers who wish to proceed with processing messages from a different group ID.
Enable the skipGroupOnError option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.
By default, we catch any exception raised by your record handler function. This allows us to (1) continue processing the batch, (2) collect each batch item that failed processing, and (3) return the appropriate response correctly without failing your Lambda function execution.
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);constlogger=newLogger();classInvalidPayloadextendsError{publicconstructor(message:string){super(message);this.name='InvalidPayload';}}constrecordHandler=async(record:SQSRecord):Promise<void>=>{constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}else{thrownewInvalidPayload('Payload does not contain minimum required fields');// (1)!}};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{// (2)!context,});
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
All records successfully processed. We will return an empty list of item failures {'batchItemFailures': []}
Partial success with some exceptions. We will return a list of all item IDs/sequence numbers that failed processing
All records failed to be processed. We will throw a FullBatchFailureError error with a list of all the errors thrown while processing unless throwOnFullBatchFailure is disabled.
The following sequence diagrams explain how each Batch processor behaves under different scenarios.
Within your recordHandler function, you might need access to the Lambda context to determine how much time you have left before your function times out.
We can automatically inject the Lambda context into your recordHandler as optional second argument if you register it when using BatchProcessorSync or the processPartialResponseSync function.
By default, the BatchProcessor will throw a FullBatchFailureError if all records in the batch fail to process, we do this to reflect the failure in your operational metrics.
When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the Lambda service will scale down the concurrency of your function, potentially impacting performance.
For these scenarios, you can set the throwOnFullBatchFailure option to false when calling.
1 2 3 4 5 6 7 8 9101112131415161718
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);constrecordHandler=async(_record:SQSRecord):Promise<void>=>{// Process the record};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,throwOnFullBatchFailure:false,});
By default, the BatchProcessor processes records in parallel using Promise.all(). However, if you need to preserve the order of records, you can set the processInParallel option to false to process records sequentially.
If the processInParallel option is not provided, the BatchProcessor will process records in parallel.
When processing records from SQS FIFO queues, we recommend using the SqsFifoPartialProcessor class, which guarantees ordering of records and implements a short-circuit mechanism to skip processing records from a different message group ID.
Sequential async processing
1 2 3 4 5 6 7 8 9101112131415161718
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);constrecordHandler=async(_record:SQSRecord):Promise<void>=>{// Process the record};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,processInParallel:false,});
You can create your own partial batch processor from scratch by inheriting the BasePartialProcessor class, and implementing the prepare(), clean(), processRecord() and processRecordSync() abstract methods.
classDiagram
direction LR
class BasePartialProcessor {
<<interface>>
+prepare()
+clean()
+processRecord(record: BaseRecord)
+processRecordSync(record: BaseRecord)
}
class YourCustomProcessor {
+prepare()
+clean()
+processRecord(record: BaseRecord)
+processRecordSyc(record: BaseRecord)
}
BasePartialProcessor <|-- YourCustomProcessor : extends
Visual representation to bring your own processor
prepare() – called once as part of the processor initialization
clean() – teardown logic called once after processRecord completes
processRecord() – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
processRecordSync() – handles all processing logic for each individual message of a batch, including calling the recordHandler (this.handler)
You can then use this class as a context manager, or pass it to processPartialResponseSync to process the records in your Lambda handler function.
import{randomInt}from'node:crypto';import{BasePartialBatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';importtype{BaseRecord,FailureResponse,SuccessResponse,}from'@aws-lambda-powertools/batch/types';import{BatchWriteItemCommand,DynamoDBClient,}from'@aws-sdk/client-dynamodb';import{marshall}from'@aws-sdk/util-dynamodb';importtype{SQSHandler}from'aws-lambda';consttableName=process.env.TABLE_NAME||'table-not-found';classMyPartialProcessorextendsBasePartialBatchProcessor{#tableName:string;#client?:DynamoDBClient;publicconstructor(tableName:string){super(EventType.SQS);this.#tableName=tableName;}/** * It's called once, **after** processing the batch. * * Here we are writing all the processed messages to DynamoDB. */publicclean():void{// biome-ignore lint/style/noNonNullAssertion: We know that the client is defined because clean() is called after prepare()this.#client!.send(newBatchWriteItemCommand({RequestItems:{[this.#tableName]:this.successMessages.map((message)=>({PutRequest:{Item:marshall(message),},})),},}));}/** * It's called once, **before** processing the batch. * * It initializes a new client and cleans up any existing data. */publicprepare():void{this.#client=newDynamoDBClient({});this.successMessages=[];}publicasyncprocessRecord(_record:BaseRecord):Promise<SuccessResponse|FailureResponse>{thrownewError('Not implemented');}/** * It handles how your record is processed. * * Here we are keeping the status of each run, `this.handler` is * the function that is passed when calling `processor.register()`. */publicprocessRecordSync(record:BaseRecord):SuccessResponse|FailureResponse{try{constresult=this.handler(record);returnthis.successHandler(record,result);}catch(error){returnthis.failureHandler(record,errorasError);}}}constprocessor=newMyPartialProcessor(tableName);constrecordHandler=():number=>{returnMath.floor(randomInt(1,10));};exportconsthandler:SQSHandler=async(event,context)=>processPartialResponse(event,recordHandler,processor,{context,});
You can use Tracer to create subsegments for each batch record processed. To do so, you can open a new subsegment for each record, and close it when you're done processing it. When adding annotations and metadata to the subsegment, you can do so directly without calling tracer.setSegment(subsegment). This allows you to work with the subsegment directly and avoid having to either pass the parent subsegment around or have to restore the parent subsegment at the end of the record processing.
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Tracer}from'@aws-lambda-powertools/tracer';import{captureLambdaHandler}from'@aws-lambda-powertools/tracer/middleware';importmiddyfrom'@middy/core';importtype{SQSEvent,SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);consttracer=newTracer({serviceName:'serverlessAirline'});constrecordHandler=async(record:SQSRecord):Promise<void>=>{constsubsegment=tracer.getSegment()?.addNewSubsegment('### recordHandler');// (1)!subsegment?.addAnnotation('messageId',record.messageId);// (2)!constpayload=record.body;if(payload){try{constitem=JSON.parse(payload);// do something with the itemsubsegment?.addMetadata('item',item);}catch(error){subsegment?.addError(error);throwerror;}}subsegment?.close();// (3)!};exportconsthandler:SQSHandler=middy(async(event:SQSEvent,context)=>processPartialResponse(event,recordHandler,processor,{context,})).use(captureLambdaHandler(tracer));
Retrieve the current segment, then create a subsegment for the record being processed
You can add annotations and metadata to the subsegment directly without calling tracer.setSegment(subsegment)
Close the subsegment when you're done processing the record
As there is no external calls, you can unit test your code with BatchProcessorSync quite easily.
Example:
Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response.
import{BatchProcessor,EventType,processPartialResponse,}from'@aws-lambda-powertools/batch';import{Logger}from'@aws-lambda-powertools/logger';importtype{SQSHandler,SQSRecord}from'aws-lambda';constprocessor=newBatchProcessor(EventType.SQS);// (1)!constlogger=newLogger();// biome-ignore format: we need the comment in the next line to stay there to annotate the code snippet in the docsconstrecordHandler=async(record:SQSRecord):Promise<void>=>{// (2)!constpayload=record.body;if(payload){constitem=JSON.parse(payload);logger.info('Processed item',{item});}};exportconsthandler:SQSHandler=async(event,context)=>// biome-ignore format: we need the comment in the next line to stay there to annotate the code snippet in the docsprocessPartialResponse(event,recordHandler,processor,{// (3)!context,});export{processor};