Skip to content

Commit b207711

Browse files
committed
chore: refactored kinesis examples
1 parent b381b0c commit b207711

9 files changed

+128
-103
lines changed

docs/utilities/batch.md

Lines changed: 11 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -108,19 +108,19 @@ This helps preserve the ordering of messages in your queue.
108108

109109
=== "Recommended"
110110

111-
```python hl_lines="3 9"
111+
```python hl_lines="5-6 11 27"
112112
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo.py"
113113
```
114114

115115
=== "As a context manager"
116116

117-
```python hl_lines="2 6"
117+
```python hl_lines="4 8"
118118
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py"
119119
```
120120

121121
=== "As a decorator (legacy)"
122122

123-
```python hl_lines="3 9"
123+
```python hl_lines="5-6 11 26"
124124
--8<-- "examples/batch_processing/src/getting_started_sqs_fifo_decorator.py"
125125
```
126126

@@ -137,122 +137,34 @@ Processing batches from Kinesis works in three stages:
137137

138138
=== "Recommended"
139139

140-
```python hl_lines="2 7 12 18 28"
140+
```python hl_lines="2-9 12 18 27"
141141
--8<-- "examples/batch_processing/src/getting_started_kinesis.py"
142142
```
143143

144144
=== "As a context manager"
145145

146-
```python hl_lines="4-5 9 15 23-25 27"
147-
import json
148-
149-
from aws_lambda_powertools import Logger, Tracer
150-
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
151-
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
152-
from aws_lambda_powertools.utilities.typing import LambdaContext
153-
154-
155-
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
156-
tracer = Tracer()
157-
logger = Logger()
158-
159-
160-
@tracer.capture_method
161-
def record_handler(record: KinesisStreamRecord):
162-
logger.info(record.kinesis.data_as_text)
163-
payload: dict = record.kinesis.data_as_json()
164-
...
165-
166-
@logger.inject_lambda_context
167-
@tracer.capture_lambda_handler
168-
def lambda_handler(event, context: LambdaContext):
169-
batch = event["Records"]
170-
with processor(records=batch, handler=record_handler):
171-
processed_messages = processor.process() # kick off processing, return list[tuple]
172-
173-
return processor.response()
146+
```python hl_lines="3-5 8 14 23-25 28"
147+
--8<-- "examples/batch_processing/src/getting_started_kinesis_context_manager.py"
174148
```
175149

176150
=== "As a decorator (legacy)"
177151

178-
```python hl_lines="2-3 7 20 22"
179-
from aws_lambda_powertools import Logger, Tracer
180-
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
181-
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord
182-
from aws_lambda_powertools.utilities.typing import LambdaContext
183-
184-
185-
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
186-
tracer = Tracer()
187-
logger = Logger()
188-
189-
190-
@tracer.capture_method
191-
def record_handler(record: KinesisStreamRecord):
192-
logger.info(record.kinesis.data_as_text)
193-
payload: dict = record.kinesis.data_as_json()
194-
...
195-
196-
@logger.inject_lambda_context
197-
@tracer.capture_lambda_handler
198-
@batch_processor(record_handler=record_handler, processor=processor)
199-
def lambda_handler(event, context: LambdaContext):
200-
return processor.response()
152+
```python hl_lines="2-9 12 18 26"
153+
--8<-- "examples/batch_processing/src/getting_started_kinesis_decorator.py"
201154
```
202155

203156
=== "Sample response"
204157

205158
The second record failed to be processed, therefore the processor added its sequence number in the response.
206159

207-
```python
208-
{
209-
'batchItemFailures': [
210-
{
211-
'itemIdentifier': '6006958808509702859251049540584488075644979031228738'
212-
}
213-
]
214-
}
160+
```json
161+
--8<-- "examples/batch_processing/src/getting_started_kinesis_response.json"
215162
```
216163

217164
=== "Sample event"
218165

219166
```json
220-
{
221-
"Records": [
222-
{
223-
"kinesis": {
224-
"kinesisSchemaVersion": "1.0",
225-
"partitionKey": "1",
226-
"sequenceNumber": "4107859083838847772757075850904226111829882106684065",
227-
"data": "eyJNZXNzYWdlIjogInN1Y2Nlc3MifQ==",
228-
"approximateArrivalTimestamp": 1545084650.987
229-
},
230-
"eventSource": "aws:kinesis",
231-
"eventVersion": "1.0",
232-
"eventID": "shardId-000000000006:4107859083838847772757075850904226111829882106684065",
233-
"eventName": "aws:kinesis:record",
234-
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
235-
"awsRegion": "us-east-2",
236-
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
237-
},
238-
{
239-
"kinesis": {
240-
"kinesisSchemaVersion": "1.0",
241-
"partitionKey": "1",
242-
"sequenceNumber": "6006958808509702859251049540584488075644979031228738",
243-
"data": "c3VjY2Vzcw==",
244-
"approximateArrivalTimestamp": 1545084650.987
245-
},
246-
"eventSource": "aws:kinesis",
247-
"eventVersion": "1.0",
248-
"eventID": "shardId-000000000006:6006958808509702859251049540584488075644979031228738",
249-
"eventName": "aws:kinesis:record",
250-
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
251-
"awsRegion": "us-east-2",
252-
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
253-
}
254-
]
255-
}
167+
--8<-- "examples/batch_processing/src/getting_started_kinesis_event.json"
256168
```
257169

258170
### Processing messages from DynamoDB

examples/batch_processing/src/getting_started_kinesis.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ def record_handler(record: KinesisStreamRecord):
1919
logger.info(record.kinesis.data_as_text)
2020
payload: dict = record.kinesis.data_as_json()
2121
logger.info(payload)
22-
...
2322

2423

2524
@logger.inject_lambda_context
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
3+
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
4+
KinesisStreamRecord,
5+
)
6+
from aws_lambda_powertools.utilities.typing import LambdaContext
7+
8+
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
9+
tracer = Tracer()
10+
logger = Logger()
11+
12+
13+
@tracer.capture_method
14+
def record_handler(record: KinesisStreamRecord):
15+
logger.info(record.kinesis.data_as_text)
16+
payload: dict = record.kinesis.data_as_json()
17+
logger.info(payload)
18+
19+
20+
@logger.inject_lambda_context
21+
@tracer.capture_lambda_handler
22+
def lambda_handler(event, context: LambdaContext):
23+
batch = event["Records"]
24+
with processor(records=batch, handler=record_handler):
25+
processed_messages = processor.process() # kick off processing, return list[tuple]
26+
logger.info(f"Processed ${len(processed_messages)} messages")
27+
28+
return processor.response()
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import (
3+
BatchProcessor,
4+
EventType,
5+
batch_processor,
6+
)
7+
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
8+
KinesisStreamRecord,
9+
)
10+
from aws_lambda_powertools.utilities.typing import LambdaContext
11+
12+
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
13+
tracer = Tracer()
14+
logger = Logger()
15+
16+
17+
@tracer.capture_method
18+
def record_handler(record: KinesisStreamRecord):
19+
logger.info(record.kinesis.data_as_text)
20+
payload: dict = record.kinesis.data_as_json()
21+
logger.info(payload)
22+
23+
24+
@logger.inject_lambda_context
25+
@tracer.capture_lambda_handler
26+
@batch_processor(record_handler=record_handler, processor=processor)
27+
def lambda_handler(event, context: LambdaContext):
28+
return processor.response()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"Records": [
3+
{
4+
"kinesis": {
5+
"kinesisSchemaVersion": "1.0",
6+
"partitionKey": "1",
7+
"sequenceNumber": "4107859083838847772757075850904226111829882106684065",
8+
"data": "eyJNZXNzYWdlIjogInN1Y2Nlc3MifQ==",
9+
"approximateArrivalTimestamp": 1545084650.987
10+
},
11+
"eventSource": "aws:kinesis",
12+
"eventVersion": "1.0",
13+
"eventID": "shardId-000000000006:4107859083838847772757075850904226111829882106684065",
14+
"eventName": "aws:kinesis:record",
15+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
16+
"awsRegion": "us-east-2",
17+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
18+
},
19+
{
20+
"kinesis": {
21+
"kinesisSchemaVersion": "1.0",
22+
"partitionKey": "1",
23+
"sequenceNumber": "6006958808509702859251049540584488075644979031228738",
24+
"data": "c3VjY2Vzcw==",
25+
"approximateArrivalTimestamp": 1545084650.987
26+
},
27+
"eventSource": "aws:kinesis",
28+
"eventVersion": "1.0",
29+
"eventID": "shardId-000000000006:6006958808509702859251049540584488075644979031228738",
30+
"eventName": "aws:kinesis:record",
31+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
32+
"awsRegion": "us-east-2",
33+
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
34+
}
35+
]
36+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"batchItemFailures": [
3+
{
4+
"itemIdentifier": "6006958808509702859251049540584488075644979031228738"
5+
}
6+
]
7+
}

examples/batch_processing/src/getting_started_sqs_fifo.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import json
2+
13
from aws_lambda_powertools import Logger, Tracer
24
from aws_lambda_powertools.utilities.batch import (
35
SqsFifoPartialProcessor,
@@ -13,7 +15,10 @@
1315

1416
@tracer.capture_method
1517
def record_handler(record: SQSRecord):
16-
...
18+
payload: str = record.body
19+
if payload:
20+
item: dict = json.loads(payload)
21+
logger.info(item)
1722

1823

1924
@logger.inject_lambda_context

examples/batch_processing/src/getting_started_sqs_fifo_context_manager.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import json
2+
13
from aws_lambda_powertools import Logger, Tracer
24
from aws_lambda_powertools.utilities.batch import SqsFifoPartialProcessor
35
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
@@ -10,7 +12,10 @@
1012

1113
@tracer.capture_method
1214
def record_handler(record: SQSRecord):
13-
...
15+
payload: str = record.body
16+
if payload:
17+
item: dict = json.loads(payload)
18+
logger.info(item)
1419

1520

1621
@logger.inject_lambda_context

examples/batch_processing/src/getting_started_sqs_fifo_decorator.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import json
2+
13
from aws_lambda_powertools import Logger, Tracer
24
from aws_lambda_powertools.utilities.batch import (
35
SqsFifoPartialProcessor,
@@ -13,7 +15,10 @@
1315

1416
@tracer.capture_method
1517
def record_handler(record: SQSRecord):
16-
...
18+
payload: str = record.body
19+
if payload:
20+
item: dict = json.loads(payload)
21+
logger.info(item)
1722

1823

1924
@logger.inject_lambda_context

0 commit comments

Comments
 (0)