Skip to content

Commit 139f52b

Browse files
committed
feat(batch): add DynamoDB Streams support
1 parent 4cfcd34 commit 139f52b

File tree

2 files changed

+93
-3
lines changed

2 files changed

+93
-3
lines changed

aws_lambda_powertools/utilities/batch/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
class EventType(Enum):
1818
SQS = "SQS"
1919
KinesisDataStreams = "KinesisDataStreams"
20-
DynamoDB = "DynamoDB"
20+
DynamoDBStreams = "DynamoDBStreams"
2121

2222

2323
class BasePartialProcessor(ABC):
@@ -172,7 +172,7 @@ def __init__(self, event_type: EventType):
172172
self._COLLECTOR_MAPPING = {
173173
EventType.SQS: self._collect_sqs_failures,
174174
EventType.KinesisDataStreams: self._collect_kinesis_failures,
175-
EventType.DynamoDB: self._collect_dynamodb_failures,
175+
EventType.DynamoDBStreams: self._collect_dynamodb_failures,
176176
}
177177
super().__init__()
178178

@@ -235,4 +235,4 @@ def _collect_kinesis_failures(self):
235235
return {"itemIdentifier": msg["kinesis"]["sequenceNumber"] for msg in self.fail_messages}
236236

237237
def _collect_dynamodb_failures(self):
238-
...
238+
return {"itemIdentifier": msg["dynamodb"]["SequenceNumber"] for msg in self.fail_messages}

tests/functional/test_utilities_batch.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,29 @@ def factory(body: str):
5454
return factory
5555

5656

57+
@pytest.fixture(scope="module")
58+
def dynamodb_event_factory() -> Callable:
59+
def factory(body: str):
60+
seq = "".join(str(randint(0, 9)) for _ in range(10))
61+
return {
62+
"eventID": "1",
63+
"eventVersion": "1.0",
64+
"dynamodb": {
65+
"Keys": {"Id": {"N": "101"}},
66+
"NewImage": {"message": {"S": body}},
67+
"StreamViewType": "NEW_AND_OLD_IMAGES",
68+
"SequenceNumber": seq,
69+
"SizeBytes": 26,
70+
},
71+
"awsRegion": "us-west-2",
72+
"eventName": "INSERT",
73+
"eventSourceARN": "eventsource_arn",
74+
"eventSource": "aws:dynamodb",
75+
}
76+
77+
return factory
78+
79+
5780
@pytest.fixture(scope="module")
5881
def record_handler() -> Callable:
5982
def handler(record):
@@ -76,6 +99,17 @@ def handler(record):
7699
return handler
77100

78101

102+
@pytest.fixture(scope="module")
103+
def dynamodb_record_handler() -> Callable:
104+
def handler(record):
105+
body = record["dynamodb"]["NewImage"]["message"]["S"]
106+
if "fail" in body:
107+
raise Exception("Failed to process record.")
108+
return body
109+
110+
return handler
111+
112+
79113
@pytest.fixture(scope="module")
80114
def config() -> Config:
81115
return Config(region_name="us-east-1")
@@ -459,3 +493,59 @@ def lambda_handler(event, context):
459493

460494
# THEN
461495
assert len(result["batchItemFailures"]) == 1
496+
497+
498+
def test_batch_processor_dynamodb_context_success_only(dynamodb_event_factory, dynamodb_record_handler):
499+
# GIVEN
500+
first_record = dynamodb_event_factory("success")
501+
second_record = dynamodb_event_factory("success")
502+
records = [first_record, second_record]
503+
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
504+
505+
# WHEN
506+
with processor(records, dynamodb_record_handler) as batch:
507+
processed_messages = batch.process()
508+
509+
# THEN
510+
assert processed_messages == [
511+
("success", first_record["dynamodb"]["NewImage"]["message"]["S"], first_record),
512+
("success", second_record["dynamodb"]["NewImage"]["message"]["S"], second_record),
513+
]
514+
515+
assert batch.response() == {"batchItemFailures": []}
516+
517+
518+
def test_batch_processor_dynamodb_context_with_failure(dynamodb_event_factory, dynamodb_record_handler):
519+
# GIVEN
520+
first_record = dynamodb_event_factory("failure")
521+
second_record = dynamodb_event_factory("success")
522+
records = [first_record, second_record]
523+
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
524+
525+
# WHEN
526+
with processor(records, dynamodb_record_handler) as batch:
527+
processed_messages = batch.process()
528+
529+
# THEN
530+
assert processed_messages[1] == ("success", second_record["dynamodb"]["NewImage"]["message"]["S"], second_record)
531+
assert len(batch.fail_messages) == 1
532+
assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["dynamodb"]["SequenceNumber"]}]}
533+
534+
535+
def test_batch_processor_dynamodb_middleware_with_failure(dynamodb_event_factory, dynamodb_record_handler):
536+
# GIVEN
537+
first_record = dynamodb_event_factory("failure")
538+
second_record = dynamodb_event_factory("success")
539+
event = {"Records": [first_record, second_record]}
540+
541+
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
542+
543+
@batch_processor(record_handler=dynamodb_record_handler, processor=processor)
544+
def lambda_handler(event, context):
545+
return processor.response()
546+
547+
# WHEN
548+
result = lambda_handler(event, {})
549+
550+
# THEN
551+
assert len(result["batchItemFailures"]) == 1

0 commit comments

Comments
 (0)