-
Notifications
You must be signed in to change notification settings - Fork 429
feat: SQS Partial failure #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
heitorlessa
merged 26 commits into
aws-powertools:develop
from
gmcrocetti:partial-sqs-batch
Sep 1, 2020
Merged
Changes from all commits
Commits
Show all changes
26 commits
Select commit
Hold shift + click to select a range
2e00554
feat: add batch module
gmcrocetti 9015d43
feat: include base processors
gmcrocetti 48a7687
feat: add sqs failure processors
gmcrocetti 9c7324b
fix: include proposed suggestions
gmcrocetti e7e36a9
feat(sqs): improve validation for queue_url
gmcrocetti 8dbaf67
feat: add package level import for batch utility
gmcrocetti 524d27f
refactor: change return for failure/success handlers
gmcrocetti 7d32e8c
test: add unit tests for partial sqs processor
gmcrocetti f9c53e0
test: add unit tests for partial sqs processor
gmcrocetti 680ee69
test: functional tests for partial sqs processor and its middleware
gmcrocetti 6cddf76
feat(sqs): add optional config parameter
gmcrocetti 13e3132
refactor(tests): processor using default config
gmcrocetti 4275fb2
refactor(sqs): change methods to protected
gmcrocetti 54a76df
fix(base-partial): append record instead of entry
gmcrocetti 438ecd5
docs(sqs): docstrings for PartialSQS
gmcrocetti c7582f3
docs(sqs-base): docstring for base class
gmcrocetti f419ef6
refactor(sqs): add module middlewares
gmcrocetti c999f96
refactor: changes partial_sqs middleware in favor of a generic interf…
gmcrocetti 5cb6b89
refactor(tests): update tests to new batch processor middleware
gmcrocetti be06149
refactor: batch middleware
gmcrocetti 8ba81b4
docs(partial-processor): add simple docstrings to success/failure han…
gmcrocetti edcc14a
fix: typo in example
gmcrocetti ded3d75
docs: user specific documentation
gmcrocetti 14cc383
docs: fix suggestions made by @heitorlessa
gmcrocetti 25d67ca
refactor: remove references to BaseProcessor. Left BasePartialProcessor
gmcrocetti 9caf3d1
docs: refactor example; improve docs about creating your own processor
gmcrocetti File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
Batch processing utility | ||
""" | ||
|
||
from .base import BasePartialProcessor | ||
from .middlewares import batch_processor | ||
from .sqs import PartialSQSProcessor | ||
|
||
__all__ = ( | ||
"BasePartialProcessor", | ||
"PartialSQSProcessor", | ||
"batch_processor", | ||
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
Batch processing utilities | ||
""" | ||
|
||
from abc import ABC, abstractmethod | ||
from typing import Any, Callable, Iterable, List, Tuple | ||
|
||
|
||
class BasePartialProcessor(ABC): | ||
""" | ||
Abstract class for batch processors. | ||
""" | ||
|
||
def __init__(self): | ||
self.success_messages: List = [] | ||
self.fail_messages: List = [] | ||
|
||
@abstractmethod | ||
def _prepare(self): | ||
""" | ||
Prepare context manager. | ||
""" | ||
raise NotImplementedError() | ||
|
||
@abstractmethod | ||
def _clean(self): | ||
""" | ||
Clear context manager. | ||
""" | ||
raise NotImplementedError() | ||
|
||
@abstractmethod | ||
def _process_record(self, record: Any): | ||
""" | ||
Process record with handler. | ||
""" | ||
raise NotImplementedError() | ||
|
||
def process(self) -> List[Tuple]: | ||
""" | ||
Call instance's handler for each record. | ||
""" | ||
return [self._process_record(record) for record in self.records] | ||
|
||
def __enter__(self): | ||
self._prepare() | ||
return self | ||
|
||
def __exit__(self, exception_type, exception_value, traceback): | ||
self._clean() | ||
|
||
def __call__(self, records: Iterable[Any], handler: Callable): | ||
""" | ||
Set instance attributes before execution | ||
|
||
Parameters | ||
---------- | ||
records: Iterable[Any] | ||
Iterable with objects to be processed. | ||
handler: Callable | ||
Callable to process "records" entries. | ||
""" | ||
self.records = records | ||
self.handler = handler | ||
return self | ||
|
||
def success_handler(self, record: Any, result: Any): | ||
""" | ||
Success callback | ||
|
||
Returns | ||
------- | ||
tuple | ||
"success", result, original record | ||
""" | ||
entry = ("success", result, record) | ||
self.success_messages.append(record) | ||
return entry | ||
|
||
def failure_handler(self, record: Any, exception: Exception): | ||
""" | ||
Failure callback | ||
|
||
Returns | ||
------- | ||
tuple | ||
"fail", exceptions args, original record | ||
""" | ||
entry = ("fail", exception.args, record) | ||
self.fail_messages.append(record) | ||
return entry |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
Middlewares for batch utilities | ||
""" | ||
|
||
from typing import Callable, Dict | ||
|
||
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator | ||
|
||
from .base import BasePartialProcessor | ||
|
||
|
||
@lambda_handler_decorator | ||
def batch_processor( | ||
handler: Callable, event: Dict, context: Dict, record_handler: Callable, processor: BasePartialProcessor = None | ||
): | ||
""" | ||
Middleware to handle batch event processing | ||
|
||
Parameters | ||
---------- | ||
handler: Callable | ||
Lambda's handler | ||
event: Dict | ||
Lambda's Event | ||
context: Dict | ||
Lambda's Context | ||
record_handler: Callable | ||
Callable to process each record from the batch | ||
processor: PartialSQSProcessor | ||
Batch Processor to handle partial failure cases | ||
|
||
Examples | ||
-------- | ||
**Processes Lambda's event with PartialSQSProcessor** | ||
>>> from aws_lambda_powertools.utilities.batch import batch_processor | ||
>>> | ||
>>> def record_handler(record): | ||
>>> return record["body"] | ||
>>> | ||
>>> @batch_processor(record_handler=record_handler, processor=PartialSQSProcessor()) | ||
>>> def handler(event, context): | ||
>>> return {"StatusCode": 200} | ||
|
||
Limitations | ||
----------- | ||
* Async batch processors | ||
|
||
""" | ||
records = event["Records"] | ||
|
||
with processor(records, record_handler): | ||
processor.process() | ||
|
||
return handler(event, context) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
Batch SQS utilities | ||
""" | ||
from typing import List, Optional, Tuple | ||
|
||
import boto3 | ||
from botocore.config import Config | ||
|
||
from .base import BasePartialProcessor | ||
|
||
|
||
class PartialSQSProcessor(BasePartialProcessor): | ||
""" | ||
Amazon SQS batch processor to delete successes from the Queue. | ||
|
||
Only the **special** case of partial failure is handled, thus a batch in | ||
which all records failed is **not** going to be removed from the queue, and | ||
the same is valid for a full success. | ||
|
||
Parameters | ||
---------- | ||
config: Config | ||
botocore config object | ||
|
||
Example | ||
------- | ||
**Process batch triggered by SQS** | ||
|
||
>>> from aws_lambda_powertools.utilities.batch import PartialSQSProcessor | ||
>>> | ||
>>> def record_handler(record): | ||
>>> return record["body"] | ||
>>> | ||
>>> def handler(event, context): | ||
>>> records = event["Records"] | ||
>>> processor = PartialSQSProcessor() | ||
>>> | ||
>>> with processor(records=records, handler=record_handler): | ||
>>> result = processor.process() | ||
>>> | ||
>>> # Case a partial failure occurred, all successful executions | ||
>>> # have been deleted from the queue after context's exit. | ||
>>> | ||
>>> return result | ||
""" | ||
|
||
def __init__(self, config: Optional[Config] = None): | ||
""" | ||
Initializes sqs client. | ||
""" | ||
config = config or Config() | ||
self.client = boto3.client("sqs", config=config) | ||
|
||
super().__init__() | ||
|
||
def _get_queue_url(self) -> str: | ||
""" | ||
Format QueueUrl from first records entry | ||
""" | ||
if not getattr(self, "records", None): | ||
return | ||
|
||
*_, account_id, queue_name = self.records[0]["eventSourceARN"].split(":") | ||
return f"{self.client._endpoint.host}/{account_id}/{queue_name}" | ||
|
||
def _get_entries_to_clean(self) -> List: | ||
""" | ||
Format messages to use in batch deletion | ||
""" | ||
return [{"Id": msg["messageId"], "ReceiptHandle": msg["receiptHandle"]} for msg in self.success_messages] | ||
|
||
def _process_record(self, record) -> Tuple: | ||
""" | ||
Process a record with instance's handler | ||
|
||
Parameters | ||
---------- | ||
record: Any | ||
An object to be processed. | ||
""" | ||
try: | ||
result = self.handler(record) | ||
return self.success_handler(record, result) | ||
except Exception as exc: | ||
return self.failure_handler(record, exc) | ||
|
||
def _prepare(self): | ||
""" | ||
Remove results from previous execution. | ||
""" | ||
self.success_messages.clear() | ||
self.fail_messages.clear() | ||
|
||
def _clean(self): | ||
""" | ||
Delete messages from Queue in case of partial failure. | ||
""" | ||
if not (self.fail_messages and self.success_messages): | ||
return | ||
|
||
queue_url = self._get_queue_url() | ||
entries_to_remove = self._get_entries_to_clean() | ||
|
||
return self.client.delete_message_batch(QueueUrl=queue_url, Entries=entries_to_remove) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.