3
3
"""
4
4
Batch SQS utilities
5
5
"""
6
-
7
- from typing import List , Optional
6
+ from typing import List , Optional , Tuple
8
7
9
8
import boto3
10
9
from botocore .config import Config
11
10
12
- from aws_lambda_powertools .middleware_factory import lambda_handler_decorator
13
-
14
11
from .base import BasePartialProcessor
15
12
16
13
17
14
class PartialSQSProcessor (BasePartialProcessor ):
15
+ """
16
+ Amazon SQS batch processor to delete successes from the Queue.
17
+
18
+ Only the **special** case of partial failure is handled, thus a batch in
19
+ which all records failed is **not** going to be removed from the queue, and
20
+ the same is valid for a full success.
21
+
22
+ Parameters
23
+ ----------
24
+ config: Config
25
+ botocore config object
26
+
27
+ Example
28
+ -------
29
+ **Process batch triggered by SQS**
30
+
31
+ >>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor
32
+ >>>
33
+ >>> def record_handler(record):
34
+ >>> return record["body"]
35
+ >>>
36
+ >>> def handler(event, context):
37
+ >>> records = event["Records"]
38
+ >>> processor = PartialSQSProcessor()
39
+ >>>
40
+ >>> with processor(records=records, handler=record_handler) as ctx:
41
+ >>> result = ctx.process()
42
+ >>>
43
+ >>> # Case a partial failure occurred, all successful executions
44
+ >>> # have been deleted from the queue after context's exit.
45
+ >>>
46
+ >>> return result
47
+ """
48
+
18
49
def __init__ (self , config : Optional [Config ] = None ):
50
+ """
51
+ Initializes sqs client and also success and failure lists
52
+ to keep track of record's execution status.
53
+ """
19
54
config = config or Config ()
20
55
self .client = boto3 .client ("sqs" , config = config )
56
+
21
57
self .success_messages : List = []
22
58
self .fail_messages : List = []
23
59
24
60
super ().__init__ ()
25
61
26
- def _get_queue_url (self ):
62
+ def _get_queue_url (self ) -> str :
27
63
"""
28
64
Format QueueUrl from first records entry
29
65
"""
@@ -33,13 +69,21 @@ def _get_queue_url(self):
33
69
* _ , account_id , queue_name = self .records [0 ]["eventSourceARN" ].split (":" )
34
70
return f"{ self .client ._endpoint .host } /{ account_id } /{ queue_name } "
35
71
36
- def _get_entries_to_clean (self ):
72
+ def _get_entries_to_clean (self ) -> List :
37
73
"""
38
74
Format messages to use in batch deletion
39
75
"""
40
76
return [{"Id" : msg ["messageId" ], "ReceiptHandle" : msg ["receiptHandle" ]} for msg in self .success_messages ]
41
77
42
- def _process_record (self , record ):
78
+ def _process_record (self , record ) -> Tuple :
79
+ """
80
+ Process a record with instance's handler
81
+
82
+ Parameters
83
+ ----------
84
+ record: Any
85
+ An object to be processed.
86
+ """
43
87
try :
44
88
result = self .handler (record )
45
89
return self .success_handler (record , result )
@@ -48,7 +92,7 @@ def _process_record(self, record):
48
92
49
93
def _prepare (self ):
50
94
"""
51
- Remove results from previous executions .
95
+ Remove results from previous execution .
52
96
"""
53
97
self .success_messages .clear ()
54
98
self .fail_messages .clear ()
@@ -64,14 +108,3 @@ def _clean(self):
64
108
entries_to_remove = self ._get_entries_to_clean ()
65
109
66
110
return self .client .delete_message_batch (QueueUrl = queue_url , Entries = entries_to_remove )
67
-
68
-
69
- @lambda_handler_decorator
70
- def partial_sqs_processor (handler , event , context , record_handler , processor = None ):
71
- records = event ["Records" ]
72
- processor = processor or PartialSQSProcessor ()
73
-
74
- with processor (records , record_handler ) as ctx :
75
- ctx .process ()
76
-
77
- return handler (event , context )
0 commit comments