11
11
import sys
12
12
from abc import ABC , abstractmethod
13
13
from enum import Enum
14
- from typing import Any , Callable , Dict , List , Optional , Tuple , Type , Union , overload , Awaitable
14
+ from typing import (
15
+ Any ,
16
+ Awaitable ,
17
+ Callable ,
18
+ Dict ,
19
+ List ,
20
+ Optional ,
21
+ Tuple ,
22
+ Type ,
23
+ Union ,
24
+ overload ,
25
+ )
15
26
16
27
from aws_lambda_powertools .middleware_factory import lambda_handler_decorator
17
28
from aws_lambda_powertools .utilities .batch .exceptions import (
@@ -118,7 +129,7 @@ async def async_process():
118
129
return list (await asyncio .gather (* [self ._async_process_record (record ) for record in self .records ]))
119
130
120
131
# WARNING
121
- # Do not use "asyncio.run(async_process())" due to Lambda container thaws/freeze, otherwise we might get "Event Loop is closed"
132
+ # Do not use "asyncio.run(async_process())" due to Lambda container thaws/freeze, otherwise we might get "Event Loop is closed" # noqa: E501
122
133
# Instead, get_event_loop() can also create one if a previous was erroneously closed
123
134
# More: https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtime-environment.html#runtimes-lifecycle-shutdown
124
135
# Extra: just follow how the well-tested mangum library do:
@@ -127,7 +138,7 @@ async def async_process():
127
138
128
139
# Detect environment and create a loop for each one
129
140
coro = async_process ()
130
- if os .environ .get (' AWS_LAMBDA_RUNTIME_API' ):
141
+ if os .environ .get (" AWS_LAMBDA_RUNTIME_API" ):
131
142
# Running in lambda server
132
143
loop = asyncio .get_event_loop ()
133
144
task_instance = loop .create_task (coro )
@@ -393,7 +404,7 @@ def _clean(self):
393
404
if self ._entire_batch_failed ():
394
405
raise BatchProcessingError (
395
406
msg = f"All records failed processing. { len (self .exceptions )} individual errors logged "
396
- f"separately below." ,
407
+ f"separately below." ,
397
408
child_exceptions = self .exceptions ,
398
409
)
399
410
@@ -480,7 +491,7 @@ def _process_record(self, record: dict) -> Union[SuccessResponse, FailureRespons
480
491
481
492
@lambda_handler_decorator
482
493
def batch_processor (
483
- handler : Callable , event : Dict , context : LambdaContext , record_handler : Callable , processor : BatchProcessor
494
+ handler : Callable , event : Dict , context : LambdaContext , record_handler : Callable , processor : BatchProcessor
484
495
):
485
496
"""
486
497
Middleware to handle batch event processing
@@ -524,7 +535,6 @@ def batch_processor(
524
535
525
536
526
537
class AsyncBatchProcessor (BasePartialBatchProcessor ):
527
-
528
538
def _process_record (self , record : dict ):
529
539
raise NotImplementedError ()
530
540
@@ -551,7 +561,11 @@ async def _async_process_record(self, record: dict) -> Union[SuccessResponse, Fa
551
561
552
562
@lambda_handler_decorator
553
563
def async_batch_processor (
554
- handler : Callable , event : Dict , context : LambdaContext , record_handler : Callable [..., Awaitable [Any ]], processor : AsyncBatchProcessor
564
+ handler : Callable ,
565
+ event : Dict ,
566
+ context : LambdaContext ,
567
+ record_handler : Callable [..., Awaitable [Any ]],
568
+ processor : AsyncBatchProcessor ,
555
569
):
556
570
"""
557
571
Middleware to handle batch event processing
0 commit comments