Closed
Description
Use case
I would like to process asynchronically the messages that reach a BatchProcessor, sometimes process messages from an SQS queue depends on HTTP calls or similar that can take some time, if all are done at the same time, it would not have an accumulated delay.
Solution/User Experience
I have made a slight hack to allow this, extending the BatchProcessor class:
import asyncio
import sys
from typing import List, Tuple, Union
from aws_lambda_powertools.utilities.batch import BatchProcessor, SuccessResponse, FailureResponse
class AsyncBatchProcessor(BatchProcessor):
async def _aprocess_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
"""
Process a record with instance's handler
Parameters
----------
record: dict
A batch record to be processed.
"""
data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
try:
if self._handler_accepts_lambda_context:
result = await self.handler(record=data, lambda_context=self.lambda_context)
else:
result = await self.handler(record=data)
return self.success_handler(record=record, result=result)
except Exception:
return self.failure_handler(record=data, exception=sys.exc_info())
async def aprocess(self) -> List[Tuple]:
return list(await asyncio.gather(*[self._aprocess_record(record) for record in self.records]))
and the main code to run:
import asyncio
import json
from aws_lambda_powertools.utilities.batch import EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Tracer, Logger
from .async_batch_preprocessor import AsyncBatchProcessor
tracer = Tracer()
logger = Logger()
aprocessor = AsyncBatchProcessor(event_type=EventType.SQS)
a = 1
async def record_handler(record: SQSRecord):
global a
"""
Process here each record
"""
payload: str = record.body
if payload:
item: dict = json.loads(payload)
print(item)
a += 1
print(f'sleeping for {a} s')
await asyncio.sleep(a)
print('awaited!')
# code code code...
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return asyncio.run(alambda_handler(event, context))
async def alambda_handler(event, context: LambdaContext):
batch = event["Records"]
with aprocessor(records=batch, handler=record_handler):
await aprocessor.aprocess() # kick off processing, return list[tuple]
return aprocessor.response()
Alternative solutions
No response
Acknowledgment
- This feature request meets Lambda Powertools Tenets
- Should this be considered in other Lambda Powertools languages? i.e. Java, TypeScript