Skip to content

Commit 9951d39

Browse files
committed
chore: pydantic refactor
1 parent bc60371 commit 9951d39

File tree

5 files changed

+173
-155
lines changed

5 files changed

+173
-155
lines changed

docs/utilities/batch.md

Lines changed: 7 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -250,130 +250,20 @@ Inheritance is importance because we need to access message IDs and sequence num
250250

251251
=== "SQS"
252252

253-
```python hl_lines="5 14 23 29"
254-
import json
255-
256-
from aws_lambda_powertools import Logger, Tracer
257-
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
258-
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
259-
from aws_lambda_powertools.utilities.typing import LambdaContext
260-
from aws_lambda_powertools.utilities.parser import BaseModel
261-
from aws_lambda_powertools.utilities.parser.types import Json
262-
263-
264-
class Order(BaseModel):
265-
item: dict
266-
267-
class OrderSqsRecord(SqsRecordModel):
268-
body: Json[Order] # deserialize order data from JSON string
269-
270-
processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord)
271-
tracer = Tracer()
272-
logger = Logger()
273-
274-
275-
@tracer.capture_method
276-
def record_handler(record: OrderSqsRecord):
277-
return record.body.item
278-
279-
@logger.inject_lambda_context
280-
@tracer.capture_lambda_handler
281-
def lambda_handler(event, context: LambdaContext):
282-
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
253+
```python hl_lines="8 17 27 34"
254+
--8<-- "examples/batch_processing/src/pydantic_sqs.py"
283255
```
284256

285257
=== "Kinesis Data Streams"
286258

287-
```python hl_lines="5 15 19 23 29 36"
288-
import json
289-
290-
from aws_lambda_powertools import Logger, Tracer
291-
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
292-
from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecordPayload, KinesisDataStreamRecord
293-
from aws_lambda_powertools.utilities.parser import BaseModel, validator
294-
from aws_lambda_powertools.utilities.parser.types import Json
295-
from aws_lambda_powertools.utilities.typing import LambdaContext
296-
297-
298-
class Order(BaseModel):
299-
item: dict
300-
301-
302-
class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload):
303-
data: Json[Order]
304-
305-
306-
class OrderKinesisRecord(KinesisDataStreamRecord):
307-
kinesis: OrderKinesisPayloadRecord
308-
309-
310-
processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord)
311-
tracer = Tracer()
312-
logger = Logger()
313-
314-
315-
@tracer.capture_method
316-
def record_handler(record: OrderKinesisRecord):
317-
return record.kinesis.data.item
318-
319-
320-
@logger.inject_lambda_context
321-
@tracer.capture_lambda_handler
322-
def lambda_handler(event, context: LambdaContext):
323-
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
259+
```python hl_lines="9 10 20 28 34 41"
260+
--8<-- "examples/batch_processing/src/pydantic_kinesis.py"
324261
```
325262

326263
=== "DynamoDB Streams"
327264

328-
```python hl_lines="7 16 26 31 35 41"
329-
import json
330-
331-
from typing import Dict, Literal, Optional
332-
333-
from aws_lambda_powertools import Logger, Tracer
334-
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
335-
from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel
336-
from aws_lambda_powertools.utilities.typing import LambdaContext
337-
from aws_lambda_powertools.utilities.parser import BaseModel, validator
338-
339-
340-
class Order(BaseModel):
341-
item: dict
342-
343-
344-
class OrderDynamoDB(BaseModel):
345-
Message: Order
346-
347-
# auto transform json string
348-
# so Pydantic can auto-initialize nested Order model
349-
@validator("Message", pre=True)
350-
def transform_message_to_dict(cls, value: Dict[Literal["S"], str]):
351-
return json.loads(value["S"])
352-
353-
354-
class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel):
355-
NewImage: Optional[OrderDynamoDB]
356-
OldImage: Optional[OrderDynamoDB]
357-
358-
359-
class OrderDynamoDBRecord(DynamoDBStreamRecordModel):
360-
dynamodb: OrderDynamoDBChangeRecord
361-
362-
363-
processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord)
364-
tracer = Tracer()
365-
logger = Logger()
366-
367-
368-
@tracer.capture_method
369-
def record_handler(record: OrderDynamoDBRecord):
370-
return record.dynamodb.NewImage.Message.item
371-
372-
373-
@logger.inject_lambda_context
374-
@tracer.capture_lambda_handler
375-
def lambda_handler(event, context: LambdaContext):
376-
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
265+
```python hl_lines="12 13 22 32 37 41 47 55"
266+
--8<-- "examples/batch_processing/src/pydantic_dynamodb.py"
377267
```
378268

379269
### Accessing processed messages
@@ -384,45 +274,7 @@ Use the context manager to access a list of all returned values from your `recor
384274
* **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record
385275

386276
```python hl_lines="30-36" title="Accessing processed messages via context manager"
387-
import json
388-
389-
from typing import Any, List, Literal, Union
390-
391-
from aws_lambda_powertools import Logger, Tracer
392-
from aws_lambda_powertools.utilities.batch import (BatchProcessor,
393-
EventType,
394-
FailureResponse,
395-
SuccessResponse)
396-
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
397-
from aws_lambda_powertools.utilities.typing import LambdaContext
398-
399-
400-
processor = BatchProcessor(event_type=EventType.SQS)
401-
tracer = Tracer()
402-
logger = Logger()
403-
404-
405-
@tracer.capture_method
406-
def record_handler(record: SQSRecord):
407-
payload: str = record.body
408-
if payload:
409-
item: dict = json.loads(payload)
410-
...
411-
412-
@logger.inject_lambda_context
413-
@tracer.capture_lambda_handler
414-
def lambda_handler(event, context: LambdaContext):
415-
batch = event["Records"]
416-
with processor(records=batch, handler=record_handler):
417-
processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process()
418-
419-
for message in processed_messages:
420-
status: Union[Literal["success"], Literal["fail"]] = message[0]
421-
result: Any = message[1]
422-
record: SQSRecord = message[2]
423-
424-
425-
return processor.response()
277+
--8<-- "examples/batch_processing/src/context_manager_access.py"
426278
```
427279

428280
### Accessing Lambda Context
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import json
2+
from typing import Any, List, Literal, Tuple, Union
3+
4+
from aws_lambda_powertools import Logger, Tracer
5+
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType
6+
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
7+
from aws_lambda_powertools.utilities.typing import LambdaContext
8+
9+
processor = BatchProcessor(event_type=EventType.SQS)
10+
tracer = Tracer()
11+
logger = Logger()
12+
13+
14+
@tracer.capture_method
15+
def record_handler(record: SQSRecord):
16+
payload: str = record.body
17+
if payload:
18+
item: dict = json.loads(payload)
19+
logger.info(item)
20+
21+
22+
@logger.inject_lambda_context
23+
@tracer.capture_lambda_handler
24+
def lambda_handler(event, context: LambdaContext):
25+
batch = event["Records"]
26+
with processor(records=batch, handler=record_handler):
27+
processed_messages: List[Tuple] = processor.process()
28+
29+
for message in processed_messages:
30+
status: Union[Literal["success"], Literal["fail"]] = message[0]
31+
result: Any = message[1]
32+
record: SQSRecord = message[2]
33+
34+
logger.info(status, result, record)
35+
36+
return processor.response()
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import json
2+
from typing import Dict, Literal, Optional
3+
4+
from aws_lambda_powertools import Logger, Tracer
5+
from aws_lambda_powertools.utilities.batch import (
6+
BatchProcessor,
7+
EventType,
8+
process_partial_response,
9+
)
10+
from aws_lambda_powertools.utilities.parser import BaseModel, validator
11+
from aws_lambda_powertools.utilities.parser.models import (
12+
DynamoDBStreamChangedRecordModel,
13+
DynamoDBStreamRecordModel,
14+
)
15+
from aws_lambda_powertools.utilities.typing import LambdaContext
16+
17+
18+
class Order(BaseModel):
19+
item: dict
20+
21+
22+
class OrderDynamoDB(BaseModel):
23+
Message: Order
24+
25+
# auto transform json string
26+
# so Pydantic can auto-initialize nested Order model
27+
@validator("Message", pre=True)
28+
def transform_message_to_dict(self, value: Dict[Literal["S"], str]):
29+
return json.loads(value["S"])
30+
31+
32+
class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel):
33+
NewImage: Optional[OrderDynamoDB]
34+
OldImage: Optional[OrderDynamoDB]
35+
36+
37+
class OrderDynamoDBRecord(DynamoDBStreamRecordModel):
38+
dynamodb: OrderDynamoDBChangeRecord
39+
40+
41+
processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord)
42+
tracer = Tracer()
43+
logger = Logger()
44+
45+
46+
@tracer.capture_method
47+
def record_handler(record: OrderDynamoDBRecord):
48+
if record.dynamodb.NewImage and record.dynamodb.NewImage.Message:
49+
return record.dynamodb.NewImage.Message.item
50+
51+
52+
@logger.inject_lambda_context
53+
@tracer.capture_lambda_handler
54+
def lambda_handler(event, context: LambdaContext):
55+
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import (
3+
BatchProcessor,
4+
EventType,
5+
process_partial_response,
6+
)
7+
from aws_lambda_powertools.utilities.parser import BaseModel
8+
from aws_lambda_powertools.utilities.parser.models import (
9+
KinesisDataStreamRecord,
10+
KinesisDataStreamRecordPayload,
11+
)
12+
from aws_lambda_powertools.utilities.parser.types import Json
13+
from aws_lambda_powertools.utilities.typing import LambdaContext
14+
15+
16+
class Order(BaseModel):
17+
item: dict
18+
19+
20+
class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload):
21+
data: Json[Order]
22+
23+
24+
class OrderKinesisRecord(KinesisDataStreamRecord):
25+
kinesis: OrderKinesisPayloadRecord
26+
27+
28+
processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord)
29+
tracer = Tracer()
30+
logger = Logger()
31+
32+
33+
@tracer.capture_method
34+
def record_handler(record: OrderKinesisRecord):
35+
return record.kinesis.data.item
36+
37+
38+
@logger.inject_lambda_context
39+
@tracer.capture_lambda_handler
40+
def lambda_handler(event, context: LambdaContext):
41+
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from aws_lambda_powertools import Logger, Tracer
2+
from aws_lambda_powertools.utilities.batch import (
3+
BatchProcessor,
4+
EventType,
5+
process_partial_response,
6+
)
7+
from aws_lambda_powertools.utilities.parser import BaseModel
8+
from aws_lambda_powertools.utilities.parser.models import SqsRecordModel
9+
from aws_lambda_powertools.utilities.parser.types import Json
10+
from aws_lambda_powertools.utilities.typing import LambdaContext
11+
12+
13+
class Order(BaseModel):
14+
item: dict
15+
16+
17+
class OrderSqsRecord(SqsRecordModel):
18+
body: Json[Order] # deserialize order data from JSON string
19+
20+
21+
processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord)
22+
tracer = Tracer()
23+
logger = Logger()
24+
25+
26+
@tracer.capture_method
27+
def record_handler(record: OrderSqsRecord):
28+
return record.body.item
29+
30+
31+
@logger.inject_lambda_context
32+
@tracer.capture_lambda_handler
33+
def lambda_handler(event, context: LambdaContext):
34+
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)

0 commit comments

Comments
 (0)