Skip to content

Commit ef9bb52

Browse files
authored
feat(kafka): lazily deserialize key/value/headers (#4068)
1 parent 006f27b commit ef9bb52

File tree

9 files changed

+359
-127
lines changed

9 files changed

+359
-127
lines changed

packages/kafka/src/consumer.ts

Lines changed: 91 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@ import type { AsyncHandler } from '@aws-lambda-powertools/commons/types';
22
import { isNull, isRecord } from '@aws-lambda-powertools/commons/typeutils';
33
import type { StandardSchemaV1 } from '@standard-schema/spec';
44
import type { Context, Handler } from 'aws-lambda';
5+
import { deserialize as deserializeJson } from './deserializer/json.js';
6+
import { deserialize as deserializePrimitive } from './deserializer/primitive.js';
57
import {
68
KafkaConsumerAvroMissingSchemaError,
9+
KafkaConsumerDeserializationError,
10+
KafkaConsumerError,
711
KafkaConsumerParserError,
812
KafkaConsumerProtobufMissingSchemaError,
913
} from './errors.js';
1014
import type {
1115
ConsumerRecord,
1216
ConsumerRecords,
17+
Deserializer,
1318
Record as KafkaRecord,
1419
MSKEvent,
1520
SchemaConfig,
@@ -27,7 +32,7 @@ const assertIsMSKEvent = (event: unknown): event is MSKEvent => {
2732
!isRecord(event.records) ||
2833
!Object.values(event.records).every((arr) => Array.isArray(arr))
2934
) {
30-
throw new Error(
35+
throw new KafkaConsumerError(
3136
'Event is not a valid MSKEvent. Expected an object with a "records" property.'
3237
);
3338
}
@@ -69,69 +74,80 @@ const deserializeHeaders = (headers: Record<string, number[]>[] | null) => {
6974
* @param config - The schema configuration to use for deserialization. See {@link SchemaConfigValue | `SchemaConfigValue`}.
7075
* If not provided, the value is decoded as a UTF-8 string.
7176
*/
72-
const deserialize = async (value: string, config?: SchemaConfigValue) => {
73-
// no config -> default to base64 decoding
77+
const deserialize = (
78+
value: string,
79+
deserializer: Deserializer,
80+
config?: SchemaConfigValue
81+
) => {
7482
if (config === undefined) {
75-
return Buffer.from(value, 'base64').toString();
83+
return deserializer(value);
7684
}
77-
78-
// if config is provided, we expect it to have a specific type
79-
if (!['json', 'avro', 'protobuf'].includes(config.type)) {
80-
throw new Error(
81-
`Unsupported deserialization type: ${config.type}. Supported types are: json, avro, protobuf.`
82-
);
83-
}
84-
8585
if (config.type === 'json') {
86-
const deserializer = await import('./deserializer/json.js');
87-
return deserializer.deserialize(value);
86+
return deserializer(value);
8887
}
8988

9089
if (config.type === 'avro') {
9190
if (!config.schema) {
9291
throw new KafkaConsumerAvroMissingSchemaError(
93-
'Schema string is required for Avro deserialization'
92+
'Schema string is required for avro deserialization'
9493
);
9594
}
96-
const deserializer = await import('./deserializer/avro.js');
97-
return deserializer.deserialize(value, config.schema);
95+
return deserializer(value, config.schema);
9896
}
9997
if (config.type === 'protobuf') {
10098
if (!config.schema) {
10199
throw new KafkaConsumerProtobufMissingSchemaError(
102-
'Schema string is required for Protobuf deserialization'
100+
'Schema string is required for protobuf deserialization'
103101
);
104102
}
105-
const deserializer = await import('./deserializer/protobuf.js');
106-
return deserializer.deserialize(value, config.schema);
103+
return deserializer(value, config.schema);
107104
}
108105
};
109106

110107
/**
111-
* Deserialize the key of a Kafka record.
108+
* Get the deserializer function based on the provided type.
112109
*
113-
* If the key is `undefined`, it returns `undefined`.
114-
*
115-
* @param key - The base64-encoded key to deserialize.
116-
* @param config - The schema configuration for deserializing the key. See {@link SchemaConfigValue | `SchemaConfigValue`}.
110+
* @param type - The type of deserializer to use. Supported types are: `json`, `avro`, `protobuf`, or `undefined`.
111+
* If `undefined`, it defaults to deserializing as a primitive string.
117112
*/
118-
const deserializeKey = async (key?: string, config?: SchemaConfigValue) => {
119-
if (key === undefined || key === '') {
120-
return undefined;
113+
const getDeserializer = async (type?: string) => {
114+
if (!type) {
115+
return deserializePrimitive as Deserializer;
116+
}
117+
if (type === 'json') {
118+
return deserializeJson as Deserializer;
119+
}
120+
if (type === 'protobuf') {
121+
const deserializer = await import('./deserializer/protobuf.js');
122+
return deserializer.deserialize as Deserializer;
123+
}
124+
if (type === 'avro') {
125+
const deserializer = await import('./deserializer/avro.js');
126+
return deserializer.deserialize as Deserializer;
121127
}
122-
if (isNull(key)) return null;
123-
return await deserialize(key, config);
128+
throw new KafkaConsumerDeserializationError(
129+
`Unsupported deserialization type: ${type}. Supported types are: json, avro, protobuf.`
130+
);
124131
};
125132

126-
const parseSchema = async (value: unknown, schema: StandardSchemaV1) => {
127-
let result = schema['~standard'].validate(value);
133+
/**
134+
* Parse a value against a provided schema using the `~standard` property for validation.
135+
*
136+
* @param value - The value to parse against the schema.
137+
* @param schema - The schema to validate against, which should be a {@link StandardSchemaV1 | `Standard Schema V1`} object.
138+
*/
139+
const parseSchema = (value: unknown, schema: StandardSchemaV1) => {
140+
const result = schema['~standard'].validate(value);
128141
/* v8 ignore start */
129-
if (result instanceof Promise) result = await result;
130-
/* v8 ignore stop */
131-
if (result.issues) {
142+
if (result instanceof Promise)
132143
throw new KafkaConsumerParserError(
133-
`Schema validation failed ${result.issues}`
144+
'Schema parsing supports only synchronous validation'
134145
);
146+
/* v8 ignore stop */
147+
if (result.issues) {
148+
throw new KafkaConsumerParserError('Schema validation failed', {
149+
cause: result.issues,
150+
});
135151
}
136152
return result.value;
137153
};
@@ -142,24 +158,45 @@ const parseSchema = async (value: unknown, schema: StandardSchemaV1) => {
142158
* @param record - A single record from the MSK event.
143159
* @param config - The schema configuration for deserializing the record's key and value.
144160
*/
145-
const deserializeRecord = async (record: KafkaRecord, config: SchemaConfig) => {
161+
const deserializeRecord = async (
162+
record: KafkaRecord,
163+
config?: SchemaConfig
164+
) => {
146165
const { key, value, headers, ...rest } = record;
147-
const { key: keyConfig, value: valueConfig } = config;
166+
const { key: keyConfig, value: valueConfig } = config || {};
148167

149-
const deserializedKey = await deserializeKey(key, keyConfig);
150-
const deserializedValue = await deserialize(value, valueConfig);
168+
const deserializerKey = await getDeserializer(keyConfig?.type);
169+
const deserializerValue = await getDeserializer(valueConfig?.type);
151170

152171
return {
153172
...rest,
154-
key: keyConfig?.parserSchema
155-
? await parseSchema(deserializedKey, keyConfig.parserSchema)
156-
: deserializedKey,
157-
value: valueConfig?.parserSchema
158-
? await parseSchema(deserializedValue, valueConfig.parserSchema)
159-
: deserializedValue,
173+
get key() {
174+
if (key === undefined || key === '') {
175+
return undefined;
176+
}
177+
if (isNull(key)) return null;
178+
const deserializedKey = deserialize(key, deserializerKey, keyConfig);
179+
180+
return keyConfig?.parserSchema
181+
? parseSchema(deserializedKey, keyConfig.parserSchema)
182+
: deserializedKey;
183+
},
160184
originalKey: key,
185+
get value() {
186+
const deserializedValue = deserialize(
187+
value,
188+
deserializerValue,
189+
valueConfig
190+
);
191+
192+
return valueConfig?.parserSchema
193+
? parseSchema(deserializedValue, valueConfig.parserSchema)
194+
: deserializedValue;
195+
},
161196
originalValue: value,
162-
headers: deserializeHeaders(headers),
197+
get headers() {
198+
return deserializeHeaders(headers);
199+
},
163200
originalHeaders: headers,
164201
};
165202
};
@@ -202,15 +239,20 @@ const deserializeRecord = async (record: KafkaRecord, config: SchemaConfig) => {
202239
*/
203240
const kafkaConsumer = <K, V>(
204241
handler: AsyncHandler<Handler<ConsumerRecords<K, V>>>,
205-
config: SchemaConfig
242+
config?: SchemaConfig
206243
): ((event: MSKEvent, context: Context) => Promise<unknown>) => {
207244
return async (event: MSKEvent, context: Context): Promise<unknown> => {
208245
assertIsMSKEvent(event);
209246

210247
const consumerRecords: ConsumerRecord<K, V>[] = [];
211248
for (const recordsArray of Object.values(event.records)) {
212249
for (const record of recordsArray) {
213-
consumerRecords.push(await deserializeRecord(record, config));
250+
consumerRecords.push(
251+
(await deserializeRecord(
252+
record,
253+
config
254+
)) as unknown as ConsumerRecord<K, V>
255+
);
214256
}
215257
}
216258

packages/kafka/src/deserializer/avro.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { KafkaConsumerDeserializationError } from '../errors.js';
77
* @param data - The base64-encoded string representing the Avro binary data.
88
* @param schema - The Avro schema as a JSON string.
99
*/
10-
export const deserialize = async (data: string, schema: string) => {
10+
const deserialize = (data: string, schema: string) => {
1111
try {
1212
const type = avro.parse(schema);
1313
const buffer = Buffer.from(data, 'base64');
@@ -18,3 +18,5 @@ export const deserialize = async (data: string, schema: string) => {
1818
);
1919
}
2020
};
21+
22+
export { deserialize };
Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
1+
import { deserialize as deserializePrimitive } from './primitive.js';
2+
13
/**
2-
* Deserializes a base64 encoded string into either a JSON object or plain string
4+
* Deserialize a base64 encoded string into either a JSON object or plain string
5+
*
36
* @param data - The base64 encoded string to deserialize
47
* @returns The deserialized data as either a JSON object or string
58
*/
6-
export const deserialize = async (data: string) => {
7-
// Decode the base64 string to a buffer
8-
const decoded = Buffer.from(data, 'base64');
9+
const deserialize = (data: string) => {
10+
const plainText = deserializePrimitive(data);
911
try {
1012
// Attempt to parse the decoded data as JSON
1113
// we assume it's a JSON but it can also be a string, we don't know
12-
return JSON.parse(decoded.toString());
14+
return JSON.parse(plainText);
1315
} catch (error) {
1416
// If JSON parsing fails, log the error and return the decoded string
1517
// in case we could not parse it we return the base64 decoded value
1618
console.error(`Failed to parse JSON from base64 value: ${data}`, error);
17-
return decoded.toString();
19+
return plainText;
1820
}
1921
};
22+
23+
export { deserialize };
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64';
2+
3+
const decoder = new TextDecoder('utf-8');
4+
5+
/**
6+
* Deserialize a base64-encoded primitive value (string).
7+
*
8+
* When customers don't provide a schema configuration, we assume the value is a base64-encoded string.
9+
*
10+
* @param data - The base64-encoded string to deserialize.
11+
*/
12+
const deserialize = (data: string) => {
13+
return decoder.decode(fromBase64(data, 'base64'));
14+
};
15+
16+
export { deserialize };
Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
1+
import type { Message } from 'protobufjs';
12
import { KafkaConsumerDeserializationError } from '../errors.js';
23
import type { ProtobufMessage } from '../types/types.js';
34

45
/**
5-
* Deserialises a Protobuf message from a base64-encoded string.
6+
* Deserialize a Protobuf message from a base64-encoded string.
7+
*
8+
* @template T - The type of the deserialized message object.
69
*
7-
* @template T - The type of the deserialised message object.
8-
* @param MessageClass - The Protobuf message type definition.
9-
* See {@link MessageType} from '@protobuf-ts/runtime'.
1010
* @param data - The base64-encoded string representing the Protobuf binary data.
11-
* @returns The deserialised message object of type T.
12-
* @throws {KafkaConsumerDeserializationError} If deserialization fails.
11+
* @param messageType - The Protobuf message type definition - see {@link Message | `Message`} from {@link https://www.npmjs.com/package/protobufjs | `protobufjs`}.
1312
*/
14-
export const deserialize = <T>(
15-
data: string,
16-
messageType: ProtobufMessage<T>
17-
): T => {
13+
const deserialize = <T>(data: string, messageType: ProtobufMessage<T>): T => {
1814
try {
1915
const buffer = Buffer.from(data, 'base64');
2016
return messageType.decode(buffer, buffer.length);
@@ -24,3 +20,5 @@ export const deserialize = <T>(
2420
);
2521
}
2622
};
23+
24+
export { deserialize };

packages/kafka/src/errors.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
* All Kafka consumer errors should extend this class.
44
*/
55
class KafkaConsumerError extends Error {
6-
constructor(message: string) {
7-
super(message);
6+
constructor(message: string, options?: ErrorOptions) {
7+
super(message, options);
88
this.name = 'KafkaConsumerError';
99
}
1010
}
@@ -13,8 +13,8 @@ class KafkaConsumerError extends Error {
1313
* Error thrown when a required Protobuf schema is missing during Kafka message consumption.
1414
*/
1515
class KafkaConsumerProtobufMissingSchemaError extends KafkaConsumerError {
16-
constructor(message: string) {
17-
super(message);
16+
constructor(message: string, options?: ErrorOptions) {
17+
super(message, options);
1818
this.name = 'KafkaConsumerProtobufMissingSchemaError';
1919
}
2020
}
@@ -23,8 +23,8 @@ class KafkaConsumerProtobufMissingSchemaError extends KafkaConsumerError {
2323
* Error thrown when deserialization of a Kafka message fails.
2424
*/
2525
class KafkaConsumerDeserializationError extends KafkaConsumerError {
26-
constructor(message: string) {
27-
super(message);
26+
constructor(message: string, options?: ErrorOptions) {
27+
super(message, options);
2828
this.name = 'KafkaConsumerDeserializationError';
2929
}
3030
}
@@ -33,15 +33,18 @@ class KafkaConsumerDeserializationError extends KafkaConsumerError {
3333
* Error thrown when a required Avro schema is missing during Kafka message consumption.
3434
*/
3535
class KafkaConsumerAvroMissingSchemaError extends KafkaConsumerError {
36-
constructor(message: string) {
37-
super(message);
36+
constructor(message: string, options?: ErrorOptions) {
37+
super(message, options);
3838
this.name = 'KafkaConsumerAvroMissingSchemaError';
3939
}
4040
}
4141

42+
/**
43+
* Error thrown when parsing a Kafka message fails.
44+
*/
4245
class KafkaConsumerParserError extends KafkaConsumerError {
43-
constructor(message: string) {
44-
super(message);
46+
constructor(message: string, options?: ErrorOptions) {
47+
super(message, options);
4548
this.name = 'KafkaConsumerParserError';
4649
}
4750
}

0 commit comments

Comments
 (0)