|
1 | 1 | from aws_lambda_powertools.utilities.parser import event_parser
|
2 |
| -from aws_lambda_powertools.utilities.parser.models import ( |
3 |
| - SqsS3EventNotificationModel, |
4 |
| - S3Model, |
5 |
| -) |
| 2 | +from aws_lambda_powertools.utilities.parser.models import SqsS3EventNotificationModel |
6 | 3 | from aws_lambda_powertools.utilities.typing import LambdaContext
|
7 |
| -from tests.functional.parser.schemas import MyAdvancedSqsBusiness |
8 | 4 | from tests.functional.utils import json_serialize, load_event
|
9 | 5 |
|
| 6 | + |
10 | 7 | @event_parser(model=SqsS3EventNotificationModel)
|
11 | 8 | def handle_sqs_json_body_containing_s3_notifications(event: SqsS3EventNotificationModel, _: LambdaContext):
|
12 | 9 | return event
|
13 | 10 |
|
14 |
| -@event_parser(model=MyAdvancedSqsBusiness) |
15 |
| -def generate_sqs_event() -> MyAdvancedSqsBusiness: |
16 |
| - return load_event("sqsEvent.json") |
17 |
| - |
18 |
| -@event_parser(model=S3Model) |
19 |
| -def generate_s3_event_notification() -> S3Model: |
20 |
| - return load_event("s3Event.json") |
21 | 11 |
|
22 | 12 | def test_handle_sqs_json_body_containing_s3_notifications():
|
23 |
| - sqs_event = generate_sqs_event() |
24 |
| - s3_event_notification = generate_s3_event_notification() |
25 |
| - for record in sqs_event.Records: |
26 |
| - record.body = json_serialize(s3_event_notification) |
| 13 | + sqs_event_dict = load_event("sqsEvent.json") |
| 14 | + s3_event_notification_dict = load_event("s3Event.json") |
| 15 | + for record in sqs_event_dict["Records"]: |
| 16 | + record["body"] = json_serialize(s3_event_notification_dict) |
27 | 17 |
|
28 |
| - parsed_event: SqsS3EventNotificationModel = handle_sqs_json_body_containing_s3_notifications(sqs_event, LambdaContext()) |
| 18 | + parsed_event: SqsS3EventNotificationModel = handle_sqs_json_body_containing_s3_notifications(sqs_event_dict, LambdaContext()) |
29 | 19 |
|
30 | 20 | assert len(parsed_event.Records) == 2
|
31 | 21 | for parsed_sqs_record in parsed_event.Records:
|
|
0 commit comments