Skip to content

Feature request: Async BatchProcessor (use case: slow processing of each item) #1708

Closed
@BakasuraRCE

Description

@BakasuraRCE

Use case

I would like to process asynchronically the messages that reach a BatchProcessor, sometimes process messages from an SQS queue depends on HTTP calls or similar that can take some time, if all are done at the same time, it would not have an accumulated delay.

Solution/User Experience

I have made a slight hack to allow this, extending the BatchProcessor class:

import asyncio
import sys
from typing import List, Tuple, Union

from aws_lambda_powertools.utilities.batch import BatchProcessor, SuccessResponse, FailureResponse


class AsyncBatchProcessor(BatchProcessor):

    async def _aprocess_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]:
        """
        Process a record with instance's handler

        Parameters
        ----------
        record: dict
            A batch record to be processed.
        """
        data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model)
        try:
            if self._handler_accepts_lambda_context:
                result = await self.handler(record=data, lambda_context=self.lambda_context)
            else:
                result = await self.handler(record=data)

            return self.success_handler(record=record, result=result)
        except Exception:
            return self.failure_handler(record=data, exception=sys.exc_info())

    async def aprocess(self) -> List[Tuple]:
        return list(await asyncio.gather(*[self._aprocess_record(record) for record in self.records]))

and the main code to run:

import asyncio
import json

from aws_lambda_powertools.utilities.batch import EventType
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext

from aws_lambda_powertools import Tracer, Logger
from .async_batch_preprocessor import AsyncBatchProcessor

tracer = Tracer()
logger = Logger()

aprocessor = AsyncBatchProcessor(event_type=EventType.SQS)

a = 1


async def record_handler(record: SQSRecord):
    global a
    """
    Process here each record
    """
    payload: str = record.body
    if payload:
        item: dict = json.loads(payload)
        print(item)
        a += 1
        print(f'sleeping for {a} s')
        await asyncio.sleep(a)
        print('awaited!')
        # code code code...


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    return asyncio.run(alambda_handler(event, context))


async def alambda_handler(event, context: LambdaContext):
    batch = event["Records"]
    with aprocessor(records=batch, handler=record_handler):
        await aprocessor.aprocess()  # kick off processing, return list[tuple]
    return aprocessor.response()

Alternative solutions

No response

Acknowledgment

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions