Skip to content

Commit edf208c

Browse files
authored
docs(kafka): finalize Kafka docs (#4076)
1 parent db9ec0c commit edf208c

15 files changed

+675
-89
lines changed

docs/features/kafka.md

Lines changed: 119 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ description: Utility
44
status: new
55
---
66

7-
???+ info "Work in progress"
8-
This documentation page is a work in progress for an upcoming feature in Powertools for AWS Lambda. If you're seeing this page, it means the release process is underway, but the feature is not yet available on npm. Please check back soon for the final version.
9-
107
The Kafka Consumer utility transparently handles message deserialization, provides an intuitive developer experience, and integrates seamlessly with the rest of the Powertools for AWS Lambda ecosystem.
118

129
```mermaid
@@ -72,15 +69,15 @@ Depending on the schema types you want to use, install the library and the corre
7269

7370
Additionally, if you want to use output parsing with [Standard Schema](https://github.com/standard-schema/standard-schema), you can install [any of the supported libraries](https://standardschema.dev/#what-schema-libraries-implement-the-spec), for example: Zod, Valibot, or ArkType.
7471

75-
### Required resources
72+
<!-- ### Required resources
7673
7774
To use the Kafka consumer utility, you need an AWS Lambda function configured with a Kafka event source. This can be Amazon MSK, MSK Serverless, or a self-hosted Kafka cluster.
7875
7976
=== "gettingStartedWithMsk.yaml"
8077
8178
```yaml
8279
--8<-- "examples/snippets/kafka/templates/gettingStartedWithMsk.yaml"
83-
```
80+
``` -->
8481

8582
### Using ESM with Schema Registry
8683

@@ -97,19 +94,19 @@ The Kafka consumer utility transforms raw Kafka events into an intuitive format
9794

9895
=== "Avro Messages"
9996

100-
```typescript
97+
```typescript hl_lines="2-3 8-13 15 19"
10198
--8<-- "examples/snippets/kafka/gettingStartedAvro.ts"
10299
```
103100

104101
=== "Protocol Buffers"
105102

106-
```typescript
103+
```typescript hl_lines="1-2 8-13 15 19"
107104
--8<-- "examples/snippets/kafka/gettingStartedProtobuf.ts"
108105
```
109106

110107
=== "JSON Messages"
111108

112-
```typescript
109+
```typescript hl_lines="1-2 7-11 13 17"
113110
--8<-- "examples/snippets/kafka/gettingStartedJson.ts"
114111
```
115112

@@ -119,7 +116,7 @@ The `kafkaConsumer` function can deserialize both keys and values independently
119116

120117
=== "index.ts"
121118

122-
```typescript
119+
```typescript hl_lines="9 13 22 25-26"
123120
--8<-- "examples/snippets/kafka/gettingStartedKeyValue.ts:func"
124121
```
125122

@@ -197,72 +194,142 @@ Each Kafka record contains important metadata that you can access alongside the
197194

198195
=== "Working with Record Metadata"
199196

200-
```typescript
197+
```typescript hl_lines="10"
201198
--8<-- "examples/snippets/kafka/advancedWorkingWithRecordMetadata.ts"
202199
```
203200

204201
For debugging purposes, you can also access the original key, value, and headers in their base64-encoded form, these are available in the `originalValue`, `originalKey`, and `originalHeaders` properties of the `record`.
205202

206203
#### Available metadata properties
207204

208-
| Property | Description | Example Use Case |
209-
|-------------------|-----------------------------------------------------|---------------------------------------------|
210-
| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers |
211-
| `partition` | Kafka partition number | Tracking message distribution |
212-
| `offset` | Position in the partition | De-duplication, exactly-once processing |
213-
| `timestamp` | Unix timestamp when record was created | Event timing analysis |
214-
| `timestamp_type` | Timestamp type (`CREATE_TIME` or `LOG_APPEND_TIME`) | Data lineage verification |
215-
| `headers` | Key-value pairs attached to the message | Cross-cutting concerns like correlation IDs |
216-
| `key` | Deserialized message key | Customer ID or entity identifier |
217-
| `value` | Deserialized message content | The actual business data |
218-
| `originalValue` | Base64-encoded original message value | Debugging or custom deserialization |
219-
| `originalKey` | Base64-encoded original message key | Debugging or custom deserialization |
220-
| `originalHeaders` | Base64-encoded original message headers | Debugging or custom deserialization |
221-
222-
### Custom output serializers
205+
| Property | Description | Example Use Case |
206+
|-----------------------|------------------------------------------------------------------|---------------------------------------------------------------------|
207+
| `topic` | Topic name the record was published to | Routing logic in multi-topic consumers |
208+
| `partition` | Kafka partition number | Tracking message distribution |
209+
| `offset` | Position in the partition | De-duplication, exactly-once processing |
210+
| `timestamp` | Unix timestamp when record was created | Event timing analysis |
211+
| `timestamp_type` | Timestamp type (`CREATE_TIME` or `LOG_APPEND_TIME`) | Data lineage verification |
212+
| `headers` | Key-value pairs attached to the message | Cross-cutting concerns like correlation IDs |
213+
| `key` | Deserialized message key | Customer ID or entity identifier |
214+
| `value` | Deserialized message content | The actual business data |
215+
| `originalValue` | Base64-encoded original message value | Debugging or custom deserialization |
216+
| `originalKey` | Base64-encoded original message key | Debugging or custom deserialization |
217+
| `originalHeaders` | Base64-encoded original message headers | Debugging or custom deserialization |
218+
| `valueSchemaMetadata` | Metadata about the value schema like `schemaId` and `dataFormat` | Used by `kafkaConsumer` to process Protobuf, data format validation |
219+
| `keySchemaMetadata` | Metadata about the key schema like `schemaId` and `dataFormat` | Used by `kafkaConsumer` to process Protobuf, data format validation |
220+
221+
### Additional Parsing
223222

224223
You can parse deserialized data using your preferred parsing library. This can help you integrate Kafka data with your domain schemas and application architecture, providing type hints, runtime parsing and validation, and advanced data transformations.
225224

226225
=== "Zod"
227226

228-
```typescript
227+
```typescript hl_lines="25 29"
229228
--8<-- "examples/snippets/kafka/advancedWorkingWithZod.ts"
230229
```
231230

232231
=== "Valibot"
233232

234-
```typescript
233+
```typescript hl_lines="28 32"
235234
--8<-- "examples/snippets/kafka/advancedWorkingWithValibot.ts"
236235
```
237236

238237
=== "ArkType"
239238

240-
```typescript
239+
```typescript hl_lines="25 29"
241240
--8<-- "examples/snippets/kafka/advancedWorkingWithArkType.ts"
242241
```
243242

244-
#### Exception types
243+
### Error handling
244+
245+
Handle errors gracefully when processing Kafka messages to ensure your application maintains resilience and provides clear diagnostic information. The Kafka consumer utility provides specific exception types to help you identify and handle deserialization issues effectively.
246+
247+
!!! tip
248+
Fields like `value`, `key`, and `headers` are decoded lazily, meaning they are only deserialized when accessed. This allows you to handle deserialization errors at the point of access rather than when the record is first processed.
245249

246-
| Exception | Description | Common Causes |
247-
|-----------|-------------|---------------|
248-
| `KafkaConsumerDeserializationError` | Raised when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration |
249-
| `KafkaConsumerAvroSchemaParserError` | Raised when parsing Avro schema definition fails | Syntax errors in schema JSON, invalid field types, or malformed schema |
250-
| `KafkaConsumerMissingSchemaError` | Raised when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) |
251-
| `KafkaConsumerOutputSerializerError` | Raised when output serializer fails | Error in custom serializer function, incompatible data, or validation failures in Pydantic models |
250+
=== "Basic Error Handling"
251+
252+
```typescript hl_lines="29 36 45"
253+
--8<-- "examples/snippets/kafka/advancedBasicErrorHandling.ts:3"
254+
```
255+
256+
1. If you want to handle deserialization and parsing errors, you should destructure or access the `value`, `key`, or `headers` properties of the record within the `for...of` loop.
257+
258+
=== "Parser Error Handling"
259+
260+
```typescript hl_lines="41 44"
261+
--8<-- "examples/snippets/kafka/advancedParserErrorHandling.ts:3"
262+
```
263+
264+
1. The `cause` property of the error is populated with the original Standard Schema parsing error, allowing you to access detailed information about the parsing failure.
265+
266+
#### Error types
267+
268+
| Exception | Description | Common Causes |
269+
|--------------------------------------|-----------------------------------------------|-----------------------------------------------------------------------------|
270+
| `KafkaConsumerError`. | Base class for all Kafka consumer errors | General unhandled errors |
271+
| `KafkaConsumerDeserializationError` | Thrown when message deserialization fails | Corrupted message data, schema mismatch, or wrong schema type configuration |
272+
| `KafkaConsumerMissingSchemaError` | Thrown when a required schema is not provided | Missing schema for AVRO or PROTOBUF formats (required parameter) |
273+
| `KafkaConsumerOutputSerializerError` | Thrown when additional schema parsing fails | Parsing failures in Standard Schema models |
252274

253275
### Integrating with Idempotency
254276

255-
When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The idempotency utility prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once.
277+
When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The [Idempotency utility](./idempotency.md) prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once.
256278

257279
The Idempotency utility automatically stores the result of each successful operation, returning the cached result if the same message is processed again, which prevents potentially harmful duplicate operations like double-charging customers or double-counting metrics.
258280

281+
!!! tip
282+
By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.
283+
259284
=== "Idempotent Kafka Processing"
260285

261-
```typescript
286+
```typescript hl_lines="44 51"
262287
--8<-- "examples/snippets/kafka/advancedWorkingWithIdempotency.ts"
263288
```
264289

265-
TIP: By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.
290+
### Best practices
291+
292+
#### Handling large messages
293+
294+
When processing large Kafka messages in Lambda, be mindful of memory limitations. Although the Kafka consumer utility optimizes memory usage, large deserialized messages can still exhaust the function resources.
295+
296+
=== "Handling Large Messages"
297+
298+
```typescript hl_lines="18-20"
299+
--8<-- "examples/snippets/kafka/advancedHandlingLargeMessages.ts:6"
300+
```
301+
302+
For large messages, consider these proven approaches:
303+
304+
* **Store the data:** use Amazon S3 and include only the S3 reference in your Kafka message
305+
* **Split large payloads:** use multiple smaller messages with sequence identifiers
306+
* **Increase memory:** Increase your Lambda function's memory allocation, which also increases CPU capacity
307+
308+
#### Batch size configuration
309+
310+
The number of Kafka records processed per Lambda invocation is controlled by your Event Source Mapping configuration. Properly sized batches optimize cost and performance.
311+
312+
=== "Handling Large Messages"
313+
314+
```yaml hl_lines="16"
315+
--8<-- "examples/snippets/kafka/templates/advancedBatchSizeConfiguration.yaml"
316+
```
317+
318+
Different workloads benefit from different batch configurations:
319+
320+
* **High-volume, simple processing:** Use larger batches (100-500 records) with short timeout
321+
* **Complex processing with database operations:** Use smaller batches (10-50 records)
322+
* **Mixed message sizes:** Set appropriate batching window (1-5 seconds) to handle variability
323+
324+
#### Cross-language compatibility
325+
326+
When using binary serialization formats across multiple programming languages, ensure consistent schema handling to prevent deserialization failures.
327+
328+
Common cross-language challenges to address:
329+
330+
* **Field naming conventions:** camelCase in Java vs snake_case in Python
331+
* **Date/time:** representation differences
332+
* **Numeric precision handling:** especially decimals, doubles, and floats
266333

267334
### Troubleshooting
268335

@@ -274,14 +341,14 @@ First, check that your schema definition exactly matches the message format. Eve
274341

275342
For binary messages that fail to deserialize, examine the raw encoded data:
276343

277-
```python
278-
# DO NOT include this code in production handlers
279-
# For troubleshooting purposes only
344+
```javascript
345+
// DO NOT include this code in production handlers
346+
// For troubleshooting purposes only
280347
import base64
281348

282-
raw_bytes = base64.b64decode(record.raw_value)
283-
print(f"Message size: {len(raw_bytes)} bytes")
284-
print(f"First 50 bytes (hex): {raw_bytes[:50].hex()}")
349+
const rawBytes = Buffer.from(record.originalValue, 'base64');
350+
console.log(`Message size: ${rawBytes.length} bytes`);
351+
console.log(`First 50 bytes (hex): ${rawBytes.slice(0, 50).toString('hex')}`);
285352
```
286353

287354
#### Schema compatibility issues
@@ -311,7 +378,7 @@ For timeout issues:
311378
* Implement chunked or asynchronous processing patterns for time-consuming operations
312379
* Monitor and optimize database operations, external API calls, or other I/O operations in your handler
313380

314-
???+ tip "Monitoring memory usage"
381+
!!! tip "Monitoring memory usage"
315382
Use CloudWatch metrics to track your function's memory utilization. If it consistently exceeds 80% of allocated memory, consider increasing the memory allocation or optimizing your code.
316383

317384
## Kafka consumer workflow
@@ -342,4 +409,10 @@ For timeout issues:
342409

343410
## Testing your code
344411

345-
TBD
412+
Testing Kafka consumer code requires simulating Lambda events with Kafka messages. You can create simple test cases using local JSON files without needing a live Kafka cluster. Below an example of how to simulate a JSON message.
413+
414+
=== "Testing your code"
415+
416+
```typescript
417+
--8<-- "examples/snippets/kafka/advancedTestingYourCode.ts"
418+
```
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
declare function processRecord(record: unknown): Promise<void>;
2+
3+
import { readFileSync } from 'node:fs';
4+
import { SchemaType, kafkaConsumer } from '@aws-lambda-powertools/kafka';
5+
import { KafkaConsumerDeserializationError } from '@aws-lambda-powertools/kafka/errors';
6+
import type {
7+
ConsumerRecord,
8+
SchemaConfig,
9+
} from '@aws-lambda-powertools/kafka/types';
10+
import { Logger } from '@aws-lambda-powertools/logger';
11+
12+
const logger = new Logger({ serviceName: 'kafka-consumer' });
13+
14+
const schemaConfig = {
15+
value: {
16+
type: SchemaType.AVRO,
17+
schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'),
18+
},
19+
} satisfies SchemaConfig;
20+
21+
export const handler = kafkaConsumer(async (event, _context) => {
22+
const results: {
23+
successful: number;
24+
failed: Array<ConsumerRecord<unknown, unknown>>;
25+
} = {
26+
successful: 0,
27+
failed: [],
28+
};
29+
for (const record of event.records) {
30+
try {
31+
const { value, partition, offset, topic } = record; // (1)!
32+
logger.setCorrelationId(`${topic}-${partition}-${offset}`);
33+
34+
await processRecord(value);
35+
36+
results.successful += 1;
37+
} catch (error) {
38+
if (error instanceof KafkaConsumerDeserializationError) {
39+
results.failed.push(record);
40+
logger.error('Error deserializing message', { error });
41+
} else {
42+
logger.error('Error processing message', { error });
43+
}
44+
}
45+
46+
if (results.failed.length > 0) {
47+
// Handle failed records, e.g., send to a dead-letter queue
48+
}
49+
50+
logger.info('Successfully processed records', {
51+
successful: results.successful,
52+
});
53+
}
54+
}, schemaConfig);
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
declare function processRecordFromS3({
2+
key,
3+
bucket,
4+
}: { key: string; bucket: string }): Promise<void>;
5+
6+
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
7+
import { Logger } from '@aws-lambda-powertools/logger';
8+
import { object, safeParse, string } from 'valibot';
9+
10+
const logger = new Logger({ serviceName: 'kafka-consumer' });
11+
12+
const LargeMessage = object({
13+
key: string(),
14+
bucket: string(),
15+
});
16+
17+
export const handler = kafkaConsumer(async (event, _context) => {
18+
for (const record of event.records) {
19+
const { topic, value, originalValue } = record;
20+
const valueSize = Buffer.byteLength(originalValue, 'utf8');
21+
const parsedValue = safeParse(LargeMessage, value);
22+
if (
23+
topic === 'product-catalog' &&
24+
valueSize > 3_000_000 &&
25+
parsedValue.success
26+
) {
27+
logger.info('Large message detected, processing from S3', {
28+
size: valueSize,
29+
});
30+
31+
const { key, bucket } = parsedValue.output;
32+
await processRecordFromS3({ key, bucket });
33+
34+
logger.info('Processed large message from S3', {
35+
key,
36+
bucket,
37+
});
38+
}
39+
40+
// regular processing of the record
41+
}
42+
});

0 commit comments

Comments
 (0)