Skip to content

Commit a2c02b8

Browse files
committed
chore: custom partial processor
1 parent 10238c4 commit a2c02b8

File tree

2 files changed

+72
-60
lines changed

2 files changed

+72
-60
lines changed

docs/utilities/batch.md

Lines changed: 2 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -327,66 +327,8 @@ You can create your own partial batch processor from scratch by inheriting the `
327327

328328
You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function.
329329

330-
```python hl_lines="3 9 24 30 37 57" title="Creating a custom batch processor"
331-
from random import randint
332-
333-
from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
334-
import boto3
335-
import os
336-
337-
table_name = os.getenv("TABLE_NAME", "table_not_found")
338-
339-
class MyPartialProcessor(BasePartialProcessor):
340-
"""
341-
Process a record and stores successful results at a Amazon DynamoDB Table
342-
343-
Parameters
344-
----------
345-
table_name: str
346-
DynamoDB table name to write results to
347-
"""
348-
349-
def __init__(self, table_name: str):
350-
self.table_name = table_name
351-
352-
super().__init__()
353-
354-
def _prepare(self):
355-
# It's called once, *before* processing
356-
# Creates table resource and clean previous results
357-
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
358-
self.success_messages.clear()
359-
360-
def _clean(self):
361-
# It's called once, *after* closing processing all records (closing the context manager)
362-
# Here we're sending, at once, all successful messages to a ddb table
363-
with self.ddb_table.batch_writer() as batch:
364-
for result in self.success_messages:
365-
batch.put_item(Item=result)
366-
367-
def _process_record(self, record):
368-
# It handles how your record is processed
369-
# Here we're keeping the status of each run
370-
# where self.handler is the record_handler function passed as an argument
371-
try:
372-
result = self.handler(record) # record_handler passed to decorator/context manager
373-
return self.success_handler(record, result)
374-
except Exception as exc:
375-
return self.failure_handler(record, exc)
376-
377-
def success_handler(self, record):
378-
entry = ("success", result, record)
379-
message = {"age": result}
380-
self.success_messages.append(message)
381-
return entry
382-
383-
384-
def record_handler(record):
385-
return randint(0, 100)
386-
387-
@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name))
388-
def lambda_handler(event, context):
389-
return {"statusCode": 200}
330+
```python hl_lines="9 16 31 37 44 55 68" title="Creating a custom batch processor"
331+
--8<-- "examples/batch_processing/src/custom_partial_processor.py"
390332
```
391333

392334
### Caveats
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import os
2+
import sys
3+
from random import randint
4+
from typing import Any
5+
6+
import boto3
7+
8+
from aws_lambda_powertools import Logger
9+
from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor
10+
11+
table_name = os.getenv("TABLE_NAME", "table_not_found")
12+
13+
logger = Logger()
14+
15+
16+
class MyPartialProcessor(BasePartialProcessor):
17+
"""
18+
Process a record and stores successful results at a Amazon DynamoDB Table
19+
20+
Parameters
21+
----------
22+
table_name: str
23+
DynamoDB table name to write results to
24+
"""
25+
26+
def __init__(self, table_name: str):
27+
self.table_name = table_name
28+
29+
super().__init__()
30+
31+
def _prepare(self):
32+
# It's called once, *before* processing
33+
# Creates table resource and clean previous results
34+
self.ddb_table = boto3.resource("dynamodb").Table(self.table_name)
35+
self.success_messages.clear()
36+
37+
def _clean(self):
38+
# It's called once, *after* closing processing all records (closing the context manager)
39+
# Here we're sending, at once, all successful messages to a ddb table
40+
with self.ddb_table.batch_writer() as batch:
41+
for result in self.success_messages:
42+
batch.put_item(Item=result)
43+
44+
def _process_record(self, record):
45+
# It handles how your record is processed
46+
# Here we're keeping the status of each run
47+
# where self.handler is the record_handler function passed as an argument
48+
try:
49+
result = self.handler(record) # record_handler passed to decorator/context manager
50+
return self.success_handler(record, result)
51+
except Exception as exc:
52+
logger.error(exc)
53+
return self.failure_handler(record, sys.exc_info())
54+
55+
def success_handler(self, record, result: Any):
56+
entry = ("success", result, record)
57+
self.success_messages.append(record)
58+
return entry
59+
60+
async def _async_process_record(self, record: dict):
61+
raise NotImplementedError()
62+
63+
64+
def record_handler(record):
65+
return randint(0, 100)
66+
67+
68+
@batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name))
69+
def lambda_handler(event, context):
70+
return {"statusCode": 200}

0 commit comments

Comments
 (0)