@@ -790,51 +790,49 @@ Use the context manager to access a list of all returned values from your `recor
790
790
* ** When failed** . We will include a tuple with ` fail ` , exception as a string, and the batch record
791
791
792
792
793
- === "app.py"
794
-
795
- ```python hl_lines="31-38"
796
- import json
793
+ ``` python hl_lines="31-38" title="Accessing processed messages via context manager"
794
+ import json
797
795
798
- from typing import Any, List, Literal, Union
796
+ from typing import Any, List, Literal, Union
799
797
800
- from aws_lambda_powertools import Logger, Tracer
801
- from aws_lambda_powertools.utilities.batch import (BatchProcessor,
802
- EventType,
803
- FailureResponse,
804
- SuccessResponse,
805
- batch_processor)
806
- from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
807
- from aws_lambda_powertools.utilities.typing import LambdaContext
798
+ from aws_lambda_powertools import Logger, Tracer
799
+ from aws_lambda_powertools.utilities.batch import (BatchProcessor,
800
+ EventType,
801
+ FailureResponse,
802
+ SuccessResponse,
803
+ batch_processor)
804
+ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
805
+ from aws_lambda_powertools.utilities.typing import LambdaContext
808
806
809
807
810
- processor = BatchProcessor(event_type=EventType.SQS)
811
- tracer = Tracer()
812
- logger = Logger()
808
+ processor = BatchProcessor(event_type = EventType.SQS )
809
+ tracer = Tracer()
810
+ logger = Logger()
813
811
814
812
815
- @tracer.capture_method
816
- def record_handler(record: SQSRecord):
817
- payload: str = record.body
818
- if payload:
819
- item: dict = json.loads(payload)
820
- ...
813
+ @tracer.capture_method
814
+ def record_handler (record : SQSRecord):
815
+ payload: str = record.body
816
+ if payload:
817
+ item: dict = json.loads(payload)
818
+ ...
821
819
822
- @logger.inject_lambda_context
823
- @tracer.capture_lambda_handler
824
- def lambda_handler(event, context: LambdaContext):
825
- batch = event["Records"]
826
- with processor(records=batch, processor=processor):
827
- processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process()
820
+ @logger.inject_lambda_context
821
+ @tracer.capture_lambda_handler
822
+ def lambda_handler (event , context : LambdaContext):
823
+ batch = event[" Records" ]
824
+ with processor(records = batch, processor = processor):
825
+ processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process()
828
826
829
- for messages in processed_messages:
830
- for message in messages:
831
- status: Union[Literal["success"], Literal["fail"]] = message[0]
832
- result: Any = message[1]
833
- record: SQSRecord = message[2]
827
+ for messages in processed_messages:
828
+ for message in messages:
829
+ status: Union[Literal[" success" ], Literal[" fail" ]] = message[0 ]
830
+ result: Any = message[1 ]
831
+ record: SQSRecord = message[2 ]
834
832
835
833
836
- return processor.response()
837
- ```
834
+ return processor.response()
835
+ ```
838
836
839
837
840
838
### Extending BatchProcessor
@@ -846,43 +844,40 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc
846
844
* ** ` success_handler() ` ** – Keeps track of successful batch records
847
845
* ** ` failure_handler() ` ** – Keeps track of failed batch records
848
846
849
- ** Example**
847
+ ???+ example
848
+ Let's suppose you'd like to add a metric named ` BatchRecordFailures ` for each batch record that failed processing
850
849
851
- Let's suppose you'd like to add a metric named ` BatchRecordFailures ` for each batch record that failed processing:
850
+ ``` python title="Extending failure handling mechanism in BatchProcessor"
852
851
853
- === "app.py"
852
+ from typing import Tuple
854
853
855
- ```python
856
-
857
- from typing import Tuple
858
-
859
- from aws_lambda_powertools import Metrics
860
- from aws_lambda_powertools.metrics import MetricUnit
861
- from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, ExceptionInfo, EventType, FailureResponse
862
- from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
854
+ from aws_lambda_powertools import Metrics
855
+ from aws_lambda_powertools.metrics import MetricUnit
856
+ from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, ExceptionInfo, EventType, FailureResponse
857
+ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
863
858
864
859
865
- class MyProcessor(BatchProcessor):
866
- def failure_handler(self, record: SQSRecord, exception: ExceptionInfo) -> FailureResponse:
867
- metrics.add_metric(name="BatchRecordFailures", unit=MetricUnit.Count, value=1)
868
- return super().failure_handler(record, exception)
860
+ class MyProcessor (BatchProcessor ):
861
+ def failure_handler (self , record : SQSRecord, exception : ExceptionInfo) -> FailureResponse:
862
+ metrics.add_metric(name = " BatchRecordFailures" , unit = MetricUnit.Count, value = 1 )
863
+ return super ().failure_handler(record, exception)
869
864
870
- processor = MyProcessor(event_type=EventType.SQS)
871
- metrics = Metrics(namespace="test")
865
+ processor = MyProcessor(event_type = EventType.SQS )
866
+ metrics = Metrics(namespace = " test" )
872
867
873
868
874
- @tracer.capture_method
875
- def record_handler(record: SQSRecord):
876
- payload: str = record.body
877
- if payload:
878
- item: dict = json.loads(payload)
879
- ...
869
+ @tracer.capture_method
870
+ def record_handler (record : SQSRecord):
871
+ payload: str = record.body
872
+ if payload:
873
+ item: dict = json.loads(payload)
874
+ ...
880
875
881
- @metrics.log_metrics(capture_cold_start_metric=True)
882
- @batch_processor(record_handler=record_handler, processor=processor)
883
- def lambda_handler(event, context: LambdaContext):
884
- return processor.response()
885
- ```
876
+ @metrics.log_metrics (capture_cold_start_metric = True )
877
+ @batch_processor (record_handler = record_handler, processor = processor)
878
+ def lambda_handler (event , context : LambdaContext):
879
+ return processor.response()
880
+ ```
886
881
887
882
### Create your own partial processor
888
883
@@ -894,69 +889,67 @@ You can create your own partial batch processor from scratch by inheriting the `
894
889
895
890
You can then use this class as a context manager, or pass it to ` batch_processor ` to use as a decorator on your Lambda handler function.
896
891
897
- === "custom_processor.py"
898
-
899
- ```python hl_lines="3 9 24 30 37 57"
900
- from random import randint
901
-
902
- from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
903
- import boto3
904
- import os
905
-
906
- table_name = os.getenv("TABLE_NAME", "table_not_found")
907
-
908
- class MyPartialProcessor(BasePartialProcessor):
909
- """
910
- Process a record and stores successful results at a Amazon DynamoDB Table
911
-
912
- Parameters
913
- ----------
914
- table_name: str
915
- DynamoDB table name to write results to
916
- """
917
-
918
- def __init__(self, table_name: str):
919
- self.table_name = table_name
920
-
921
- super().__init__()
922
-
923
- def _prepare(self):
924
- # It's called once, *before* processing
925
- # Creates table resource and clean previous results
926
- self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
927
- self.success_messages.clear()
928
-
929
- def _clean(self):
930
- # It's called once, *after* closing processing all records (closing the context manager)
931
- # Here we're sending, at once, all successful messages to a ddb table
932
- with ddb_table.batch_writer() as batch:
933
- for result in self.success_messages:
934
- batch.put_item(Item=result)
935
-
936
- def _process_record(self, record):
937
- # It handles how your record is processed
938
- # Here we're keeping the status of each run
939
- # where self.handler is the record_handler function passed as an argument
940
- try:
941
- result = self.handler(record) # record_handler passed to decorator/context manager
942
- return self.success_handler(record, result)
943
- except Exception as exc:
944
- return self.failure_handler(record, exc)
945
-
946
- def success_handler(self, record):
947
- entry = ("success", result, record)
948
- message = {"age": result}
949
- self.success_messages.append(message)
950
- return entry
951
-
952
-
953
- def record_handler(record):
954
- return randint(0, 100)
955
-
956
- @batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name))
957
- def lambda_handler(event, context):
958
- return {"statusCode": 200}
959
- ```
892
+ ``` python hl_lines="3 9 24 30 37 57" title="Creating a custom batch processor"
893
+ from random import randint
894
+
895
+ from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
896
+ import boto3
897
+ import os
898
+
899
+ table_name = os.getenv(" TABLE_NAME" , " table_not_found" )
900
+
901
+ class MyPartialProcessor (BasePartialProcessor ):
902
+ """
903
+ Process a record and stores successful results at a Amazon DynamoDB Table
904
+
905
+ Parameters
906
+ ----------
907
+ table_name: str
908
+ DynamoDB table name to write results to
909
+ """
910
+
911
+ def __init__ (self , table_name : str ):
912
+ self .table_name = table_name
913
+
914
+ super ().__init__ ()
915
+
916
+ def _prepare (self ):
917
+ # It's called once, *before* processing
918
+ # Creates table resource and clean previous results
919
+ self .ddb_table = boto3.resource(" dynamodb" ).Table(self .table_name)
920
+ self .success_messages.clear()
921
+
922
+ def _clean (self ):
923
+ # It's called once, *after* closing processing all records (closing the context manager)
924
+ # Here we're sending, at once, all successful messages to a ddb table
925
+ with ddb_table.batch_writer() as batch:
926
+ for result in self .success_messages:
927
+ batch.put_item(Item = result)
928
+
929
+ def _process_record (self , record ):
930
+ # It handles how your record is processed
931
+ # Here we're keeping the status of each run
932
+ # where self.handler is the record_handler function passed as an argument
933
+ try :
934
+ result = self .handler(record) # record_handler passed to decorator/context manager
935
+ return self .success_handler(record, result)
936
+ except Exception as exc:
937
+ return self .failure_handler(record, exc)
938
+
939
+ def success_handler (self , record ):
940
+ entry = (" success" , result, record)
941
+ message = {" age" : result}
942
+ self .success_messages.append(message)
943
+ return entry
944
+
945
+
946
+ def record_handler (record ):
947
+ return randint(0 , 100 )
948
+
949
+ @batch_processor (record_handler = record_handler, processor = MyPartialProcessor(table_name))
950
+ def lambda_handler (event , context ):
951
+ return {" statusCode" : 200 }
952
+ ```
960
953
961
954
### Caveats
962
955
@@ -1145,20 +1138,18 @@ When using Sentry.io for error monitoring, you can override `failure_handler` to
1145
1138
1146
1139
> Credits to [ Charles-Axel Dein] ( https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732 )
1147
1140
1148
- === "sentry_integration.py"
1149
-
1150
- ```python hl_lines="4 7-8"
1151
- from typing import Tuple
1141
+ ``` python hl_lines="4 7-8" title="Integrating error tracking with Sentry.io"
1142
+ from typing import Tuple
1152
1143
1153
- from aws_lambda_powertools.utilities.batch import BatchProcessor, FailureResponse
1154
- from sentry_sdk import capture_exception
1144
+ from aws_lambda_powertools.utilities.batch import BatchProcessor, FailureResponse
1145
+ from sentry_sdk import capture_exception
1155
1146
1156
1147
1157
- class MyProcessor(BatchProcessor):
1158
- def failure_handler(self, record, exception) -> FailureResponse:
1159
- capture_exception() # send exception to Sentry
1160
- return super().failure_handler(record, exception)
1161
- ```
1148
+ class MyProcessor (BatchProcessor ):
1149
+ def failure_handler (self , record , exception ) -> FailureResponse:
1150
+ capture_exception() # send exception to Sentry
1151
+ return super ().failure_handler(record, exception)
1152
+ ```
1162
1153
1163
1154
1164
1155
## Legacy
0 commit comments