diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index d1c534f9fe..8b6e750cb2 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -151,6 +151,21 @@ If you want to extend a schema and transform a JSON stringified payload to an ob --8<-- "examples/snippets/parser/samples/exampleSqsPayload.json" ``` +### DynamoDB Stream event parsing + +If you want to parse a DynamoDB stream event with unmarshalling, you can use the helper function `DynamoDBMarshalled`: + +=== "DynamoDBStreamSchema with DynamoDBMarshalled" + ```typescript hl_lines="17" + --8<-- "examples/snippets/parser/extendDynamoDBStreamSchema.ts" + ``` + +=== "DynamoDBStream event payload" + + ```json hl_lines="13-20 49-56" + --8<-- "examples/snippets/parser/samples/exampleDynamoDBStreamPayload.json" + ``` + ## Envelopes When trying to parse your payload you might encounter the following situations: diff --git a/examples/snippets/parser/extendDynamoDBStreamSchema.ts b/examples/snippets/parser/extendDynamoDBStreamSchema.ts new file mode 100644 index 0000000000..86639f5cfb --- /dev/null +++ b/examples/snippets/parser/extendDynamoDBStreamSchema.ts @@ -0,0 +1,23 @@ +import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb'; +import { + DynamoDBStreamRecord, + DynamoDBStreamSchema, +} from '@aws-lambda-powertools/parser/schemas/dynamodb'; +import { z } from 'zod'; + +const customSchema = z.object({ + id: z.string(), + message: z.string(), +}); + +const extendedSchema = DynamoDBStreamSchema.extend({ + Records: z.array( + DynamoDBStreamRecord.extend({ + dynamodb: z.object({ + NewImage: DynamoDBMarshalled(customSchema).optional(), + }), + }) + ), +}); + +type ExtendedDynamoDBStreamEvent = z.infer; diff --git a/examples/snippets/parser/samples/exampleDynamoDBStreamPayload.json b/examples/snippets/parser/samples/exampleDynamoDBStreamPayload.json new file mode 100644 index 0000000000..621c5dc47f --- /dev/null +++ b/examples/snippets/parser/samples/exampleDynamoDBStreamPayload.json @@ -0,0 +1,66 @@ +{ + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "ApproximateCreationDateTime": 1693997155.0, + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "111", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "2", + "eventVersion": "1.0", + "dynamodb": { + "OldImage": { + "Message": { + "S": "New item!" + }, + "Id": { + "N": "101" + } + }, + "SequenceNumber": "222", + "Keys": { + "Id": { + "N": "101" + } + }, + "SizeBytes": 59, + "NewImage": { + "Message": { + "S": "This item has changed" + }, + "Id": { + "N": "101" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES" + }, + "awsRegion": "us-west-2", + "eventName": "MODIFY", + "eventSourceARN": "source_arn", + "eventSource": "aws:dynamodb" + } + ] + } + \ No newline at end of file diff --git a/packages/parser/package.json b/packages/parser/package.json index 46f7e3071f..81e940968a 100644 --- a/packages/parser/package.json +++ b/packages/parser/package.json @@ -180,6 +180,10 @@ "require": "./lib/cjs/helpers.js", "import": "./lib/esm/helpers.js" }, + "./helpers/dynamodb": { + "require": "./lib/cjs/helpers/dynamodb.js", + "import": "./lib/esm/helpers/dynamodb.js" + }, "./types": { "require": "./lib/cjs/types/index.js", "import": "./lib/esm/types/index.js" @@ -363,7 +367,8 @@ ], "peerDependencies": { "@middy/core": "4.x || 5.x || 6.x", - "zod": ">=3.x" + "zod": ">=3.x", + "@aws-sdk/util-dynamodb": ">=3.x" }, "peerDependenciesMeta": { "zod": { diff --git a/packages/parser/src/helpers/dynamodb.ts b/packages/parser/src/helpers/dynamodb.ts new file mode 100644 index 0000000000..7e1755ca2a --- /dev/null +++ b/packages/parser/src/helpers/dynamodb.ts @@ -0,0 +1,85 @@ +import type { AttributeValue } from '@aws-sdk/client-dynamodb'; +import { unmarshall } from '@aws-sdk/util-dynamodb'; +import { type ZodTypeAny, z } from 'zod'; + +/** + * A helper function to unmarshall DynamoDB stream events and validate them against a schema. + * + * @example + * ```typescript + * const mySchema = z.object({ + * id: z.string(), + * name: z.string(), + * }); + * const eventSchema = DynamoDBStreamSchema.extend({ + * Records: z.array( + * DynamoDBStreamRecord.extend({ + * dynamodb: z.object({ + * NewImage: DynamoDBMarshalled(mySchema).optional(), + * }), + * }) + * ), + * }); + * type eventSchema = z.infer; + * ``` + * For example, if you have a DynamoDB stream event like the following: + * + * ```json + * { + * "Records": [ + * { + * "dynamodb": { + * "NewImage": { + * "id": { + * "S": "12345" + * }, + * "name": { + * "S": "John Doe" + * } + * } + * } + * } + * ] + * } + * ``` + * Resulting in: + * + * ```json + * { + * "Records": [ + * { + * "dynamodb": { + * "NewImage": { + * "id": "12345", + * "name": "John Doe" + * } + * } + * } + * ] + * } + * ``` + * + * @param schema - The schema to validate the JSON string against + */ +const DynamoDBMarshalled = (schema: T) => + z + .union([ + z.custom(), + z.record(z.string(), z.custom()), + ]) + .transform((str, ctx) => { + try { + return unmarshall(str); + } catch (err) { + ctx.addIssue({ + code: 'custom', + message: 'Could not unmarshall DynamoDB stream record', + fatal: true, + }); + + return z.NEVER; + } + }) + .pipe(schema); + +export { DynamoDBMarshalled }; diff --git a/packages/parser/tests/unit/helpers.test.ts b/packages/parser/tests/unit/helpers.test.ts index d52d0f44df..15f03b136c 100644 --- a/packages/parser/tests/unit/helpers.test.ts +++ b/packages/parser/tests/unit/helpers.test.ts @@ -1,13 +1,22 @@ import { describe, expect, it } from 'vitest'; import { z } from 'zod'; import { JSONStringified } from '../../src/helpers.js'; +import { DynamoDBMarshalled } from '../../src/helpers/dynamodb.js'; import { AlbSchema } from '../../src/schemas/alb.js'; +import { + DynamoDBStreamRecord, + DynamoDBStreamSchema, +} from '../../src/schemas/dynamodb'; import { SnsNotificationSchema, SnsRecordSchema, } from '../../src/schemas/sns.js'; import { SqsRecordSchema, SqsSchema } from '../../src/schemas/sqs.js'; -import type { SnsEvent, SqsEvent } from '../../src/types/schema.js'; +import type { + DynamoDBStreamEvent, + SnsEvent, + SqsEvent, +} from '../../src/types/schema.js'; import { getTestEvent } from './schema/utils.js'; const bodySchema = z.object({ @@ -152,3 +161,145 @@ describe('JSONStringified', () => { }); }); }); + +describe('DynamoDBMarshalled', () => { + // Prepare + const schema = z.object({ + Message: z.string(), + Id: z.number(), + }); + + const extendedSchema = DynamoDBStreamSchema.extend({ + Records: z.array( + DynamoDBStreamRecord.extend({ + dynamodb: z.object({ + NewImage: DynamoDBMarshalled(schema).optional(), + }), + }) + ), + }); + + it('should correctly unmarshall and validate a valid DynamoDB stream record', () => { + // Prepare + const testInput = [ + { + Message: { + S: 'New item!', + }, + Id: { + N: '101', + }, + }, + { + Message: { + S: 'This item has changed', + }, + Id: { + N: '101', + }, + }, + ]; + const expectedOutput = [ + { + Id: 101, + Message: 'New item!', + }, + { + Id: 101, + Message: 'This item has changed', + }, + ]; + + const testEvent = getTestEvent({ + eventsPath: '.', + filename: 'dynamoStreamEvent', + }); + + testEvent.Records[0].dynamodb.NewImage = testInput[0]; + testEvent.Records[1].dynamodb.NewImage = testInput[1]; + + // Act & Assess + expect(extendedSchema.parse(testEvent)).toStrictEqual({ + Records: [ + { + ...testEvent.Records[0], + dynamodb: { + NewImage: expectedOutput[0], + }, + }, + { + ...testEvent.Records[1], + dynamodb: { + NewImage: expectedOutput[1], + }, + }, + ], + }); + }); + + it('should throw an error if the DynamoDB stream record cannot be unmarshalled', () => { + // Prepare + const testInput = [ + { + Message: { + S: 'New item!', + }, + Id: { + NNN: '101', //unknown type + }, + }, + { + Message: { + S: 'This item has changed', + }, + Id: { + N: '101', + }, + }, + ]; + + const testEvent = getTestEvent({ + eventsPath: '.', + filename: 'dynamoStreamEvent', + }); + + testEvent.Records[0].dynamodb.NewImage = testInput[0]; + testEvent.Records[1].dynamodb.NewImage = testInput[1]; + + // Act & Assess + expect(() => extendedSchema.parse(testEvent)).toThrow( + 'Could not unmarshall DynamoDB stream record' + ); + }); + + it('should throw a validation error if the unmarshalled record does not match the schema', () => { + // Prepare + const testInput = [ + { + Message: { + S: 'New item!', + }, + Id: { + N: '101', + }, + }, + { + Message: { + S: 'This item has changed', + }, + // Id is missing + }, + ]; + + const testEvent = getTestEvent({ + eventsPath: '.', + filename: 'dynamoStreamEvent', + }); + + testEvent.Records[0].dynamodb.NewImage = testInput[0]; + testEvent.Records[1].dynamodb.NewImage = testInput[1]; + + // Act & Assess + expect(() => extendedSchema.parse(testEvent)).toThrow(); + }); +}); diff --git a/packages/parser/tests/unit/schema/utils.ts b/packages/parser/tests/unit/schema/utils.ts index aa5c9ab531..261b24f23d 100644 --- a/packages/parser/tests/unit/schema/utils.ts +++ b/packages/parser/tests/unit/schema/utils.ts @@ -121,6 +121,14 @@ const createTestEvents = (fileList: readonly string[]): TestEvents => { export const TestEvents = createTestEvents(filenames); +/** + * Reads and parses a JSON file from the specified events path and filename, returning the parsed object. + * + * @template T - The expected type of the parsed JSON object. + * @param {Object} params - The parameters for the function. + * @param {string} params.eventsPath - The relative path to the directory containing the event files. + * @param {string} params.filename - The name of the JSON file (without extension) to be read and parsed. + */ export const getTestEvent = >({ eventsPath, filename,