Skip to content

Commit 445f5c3

Browse files
Add support for tumbling windows events serialization (#342)
1 parent 06d1ebf commit 445f5c3

File tree

6 files changed

+179
-0
lines changed

6 files changed

+179
-0
lines changed

aws-lambda-java-serialization/src/main/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializers.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.CodeCommitEventMixin;
99
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.ConnectEventMixin;
1010
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.DynamodbEventMixin;
11+
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.DynamodbTimeWindowEventMixin;
1112
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.KinesisEventMixin;
13+
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.KinesisTimeWindowEventMixin;
1214
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.SNSEventMixin;
1315
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.SQSEventMixin;
1416
import com.amazonaws.services.lambda.runtime.serialization.events.mixins.ScheduledEventMixin;
@@ -63,8 +65,10 @@ public class LambdaEventSerializers {
6365
"com.amazonaws.services.lambda.runtime.events.ConfigEvent",
6466
"com.amazonaws.services.lambda.runtime.events.ConnectEvent",
6567
"com.amazonaws.services.lambda.runtime.events.DynamodbEvent",
68+
"com.amazonaws.services.lambda.runtime.events.DynamodbTimeWindowEvent",
6669
"com.amazonaws.services.lambda.runtime.events.IoTButtonEvent",
6770
"com.amazonaws.services.lambda.runtime.events.KinesisEvent",
71+
"com.amazonaws.services.lambda.runtime.events.KinesisTimeWindowEvent",
6872
"com.amazonaws.services.lambda.runtime.events.KinesisFirehoseEvent",
6973
"com.amazonaws.services.lambda.runtime.events.LambdaDestinationEvent",
7074
"com.amazonaws.services.lambda.runtime.events.LexEvent",
@@ -128,10 +132,14 @@ public class LambdaEventSerializers {
128132
DynamodbEventMixin.AttributeValueMixin.class),
129133
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue",
130134
DynamodbEventMixin.AttributeValueMixin.class),
135+
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.DynamodbTimeWindowEvent",
136+
DynamodbTimeWindowEventMixin.class),
131137
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.KinesisEvent",
132138
KinesisEventMixin.class),
133139
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.KinesisEvent$Record",
134140
KinesisEventMixin.RecordMixin.class),
141+
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.KinesisTimeWindowEvent",
142+
KinesisTimeWindowEventMixin.class),
135143
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.ScheduledEvent",
136144
ScheduledEventMixin.class),
137145
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.SecretsManagerRotationEvent",
@@ -172,6 +180,15 @@ public class LambdaEventSerializers {
172180
"com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord",
173181
"com.amazonaws.services.dynamodbv2.model.StreamRecord"),
174182
new NestedClass("com.amazonaws.services.lambda.runtime.events.DynamodbEvent$DynamodbStreamRecord"))),
183+
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.DynamodbTimeWindowEvent",
184+
Arrays.asList(
185+
new AlternateNestedClass(
186+
"com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue",
187+
"com.amazonaws.services.dynamodbv2.model.AttributeValue"),
188+
new AlternateNestedClass(
189+
"com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord",
190+
"com.amazonaws.services.dynamodbv2.model.StreamRecord"),
191+
new NestedClass("com.amazonaws.services.lambda.runtime.events.DynamodbEvent$DynamodbStreamRecord"))),
175192
new SimpleEntry<>("com.amazonaws.services.lambda.runtime.events.KinesisEvent",
176193
Arrays.asList(
177194
new NestedClass("com.amazonaws.services.lambda.runtime.events.KinesisEvent$Record"))),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*/
4+
5+
package com.amazonaws.services.lambda.runtime.serialization.events.mixins;
6+
7+
import com.fasterxml.jackson.annotation.JsonProperty;
8+
9+
public abstract class DynamodbTimeWindowEventMixin extends DynamodbEventMixin {
10+
11+
// needed because Jackson expects "eventSourceArn" instead of "eventSourceARN"
12+
@JsonProperty("eventSourceARN") abstract String getEventSourceArn();
13+
@JsonProperty("eventSourceARN") abstract void setEventSourceArn(String eventSourceArn);
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*/
4+
5+
package com.amazonaws.services.lambda.runtime.serialization.events.mixins;
6+
7+
import com.fasterxml.jackson.annotation.JsonProperty;
8+
9+
public abstract class KinesisTimeWindowEventMixin extends KinesisEventMixin {
10+
11+
// needed because Jackson expects "eventSourceArn" instead of "eventSourceARN"
12+
@JsonProperty("eventSourceARN") abstract String getEventSourceArn();
13+
@JsonProperty("eventSourceARN") abstract void setEventSourceArn(String eventSourceArn);
14+
}

aws-lambda-java-serialization/src/test/java/com/amazonaws/services/lambda/runtime/serialization/events/LambdaEventSerializersTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ private static Stream<Arguments> serdeArguments() {
3636
Arguments.of("cognito_event.json", CognitoEvent.class),
3737
Arguments.of("config_event.json", ConfigEvent.class),
3838
Arguments.of("dynamodb_event.json", DynamodbEvent.class),
39+
Arguments.of("dynamodb_time_window_event.json", DynamodbTimeWindowEvent.class),
3940
Arguments.of("iot_button_event.json", IoTButtonEvent.class),
4041
Arguments.of("kinesis_analytics_firehose_input_preprocessing_event.json", KinesisAnalyticsFirehoseInputPreprocessingEvent.class),
4142
Arguments.of("kinesis_analytics_input_preprocessing_response_event.json", KinesisAnalyticsInputPreprocessingResponse.class),
4243
Arguments.of("kinesis_analytics_output_delivery_event.json", KinesisAnalyticsOutputDeliveryEvent.class),
4344
Arguments.of("kinesis_analytics_output_delivery_response_event.json", KinesisAnalyticsOutputDeliveryResponse.class),
4445
Arguments.of("kinesis_analytics_streams_input_preprocessing_event.json", KinesisAnalyticsStreamsInputPreprocessingEvent.class),
4546
Arguments.of("kinesis_event.json", KinesisEvent.class),
47+
Arguments.of("kinesis_time_window_event.json", KinesisTimeWindowEvent.class),
4648
Arguments.of("kinesis_firehose_event.json", KinesisFirehoseEvent.class),
4749
Arguments.of("lex_event.json", LexEvent.class),
4850
Arguments.of("s3_event.json", S3Event.class),
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
{
2+
"Records": [
3+
{
4+
"eventID": "1",
5+
"eventName": "INSERT",
6+
"eventVersion": "1.0",
7+
"eventSource": "aws:dynamodb",
8+
"awsRegion": "us-east-1",
9+
"dynamodb": {
10+
"Keys": {
11+
"Id": {
12+
"N": "101"
13+
}
14+
},
15+
"NewImage": {
16+
"Message": {
17+
"S": "New item!"
18+
},
19+
"Id": {
20+
"N": "101"
21+
}
22+
},
23+
"SequenceNumber": "111",
24+
"SizeBytes": 26,
25+
"StreamViewType": "NEW_AND_OLD_IMAGES"
26+
},
27+
"eventSourceARN": "stream-ARN"
28+
},
29+
{
30+
"eventID": "2",
31+
"eventName": "MODIFY",
32+
"eventVersion": "1.0",
33+
"eventSource": "aws:dynamodb",
34+
"awsRegion": "us-east-1",
35+
"dynamodb": {
36+
"Keys": {
37+
"Id": {
38+
"N": "101"
39+
}
40+
},
41+
"NewImage": {
42+
"Message": {
43+
"S": "This item has changed"
44+
},
45+
"Id": {
46+
"N": "101"
47+
}
48+
},
49+
"OldImage": {
50+
"Message": {
51+
"S": "New item!"
52+
},
53+
"Id": {
54+
"N": "101"
55+
}
56+
},
57+
"SequenceNumber": "222",
58+
"SizeBytes": 59,
59+
"StreamViewType": "NEW_AND_OLD_IMAGES"
60+
},
61+
"eventSourceARN": "stream-ARN"
62+
},
63+
{
64+
"eventID": "3",
65+
"eventName": "REMOVE",
66+
"eventVersion": "1.0",
67+
"eventSource": "aws:dynamodb",
68+
"awsRegion": "us-east-1",
69+
"dynamodb": {
70+
"Keys": {
71+
"Id": {
72+
"N": "101"
73+
}
74+
},
75+
"OldImage": {
76+
"Message": {
77+
"S": "This item has changed"
78+
},
79+
"Id": {
80+
"N": "101"
81+
}
82+
},
83+
"SequenceNumber": "333",
84+
"SizeBytes": 38,
85+
"StreamViewType": "NEW_AND_OLD_IMAGES"
86+
},
87+
"eventSourceARN": "stream-ARN"
88+
}
89+
],
90+
"window": {
91+
"start": "2021-10-26T17:00:00Z",
92+
"end": "2021-10-26T17:05:00Z"
93+
},
94+
"state": {
95+
"1": "state1"
96+
},
97+
"shardId": "shard123456789",
98+
"eventSourceARN": "stream-ARN",
99+
"isFinalInvokeForWindow": false,
100+
"isWindowTerminatedEarly": false
101+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"Records": [
3+
{
4+
"kinesis": {
5+
"kinesisSchemaVersion": "1.0",
6+
"partitionKey": "1",
7+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
8+
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
9+
"approximateArrivalTimestamp": 1545084650.987
10+
},
11+
"eventSource": "aws:kinesis",
12+
"eventVersion": "1.0",
13+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
14+
"eventName": "aws:kinesis:record",
15+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
16+
"awsRegion": "us-east-2",
17+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
18+
}
19+
],
20+
"window": {
21+
"start": "2021-10-26T17:00:00Z",
22+
"end": "2021-10-26T17:05:00Z"
23+
},
24+
"state": {
25+
"1": "state1"
26+
},
27+
"shardId": "shardId-000000000006",
28+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream",
29+
"isFinalInvokeForWindow": false,
30+
"isWindowTerminatedEarly": false
31+
}

0 commit comments

Comments
 (0)