Description
Is this related to an existing feature request or issue?
No response
Which AWS Lambda Powertools utility does this relate to?
New utility
Summary
Implement a S3 Streaming utility so we can process S3 files that are bigger than memory, or temporary storage.
Use case
As a user processing S3 files inside my Lambda, I would like to be able to consume big S3 files in a streaming fashion. I want Powertools to handle all the streaming and IO for me, so I can focus on writing the logic that handles the data.
Ideally, I would be able to enable plugins to further process the streaming data: for instance the data on S3 could be compressed, or could be formatted as JSON lines. I want Powertools to handle all the unpacking/deserialization for me.
Current experience
Currently, one can tap into the the StreamingBody
response from boto3 in order to stream bytes from S3:
stream = s3.Object(bucket_name=bucket, key=key).get()["Body"]
One can also compose this stream with data transformations, like inflating a gzip stream:
gz = GzipFile(raw=stream)
for line in gz:
....
However, this raw solutions has the following problems/limitations:
- The raw stream from boto3 is not seeakble (necessary for things like
zipfile
) - Poor DX when plugging multiple data transformations
- No transparent error handling for re-triable exceptions
Proposal
Introduce a new utility called streaming
and a new class S3Object
that would implement the RawIOBase
interface.
class S3Object(io.RawIOBase):
def __init__(self, bucket: string, key: string, version_id: Optional[str], boto3_client: Optional[boto3.Client]):
The code would implement all the necessary methods to satisfy the RawIOBase
protocol.
Additionally, create a plug-in system/middleware to plug additional data transformations on the stream. Common transformations could be included directly on the class constructor:
S3Object(...., gunzip=True, json=True)
While still providing a way to implement custom data transformations:
class CustomTransform(BaseTransform):
def transform(self, source: io.BufferedIOBase) -> io.BufferedIOBase:
...
s3 = S3Object(...)
s3.add_transform(CustomTransform())
User experience
A user can use the utility from the Lambda handler:
def lambda_handler(event: dict, context: LambdaContext):
s3_obj = S3Object(bucket=event.get("bucket"), key=event.get("key"), gunzip=True)
for line in s3_obj:
# do something with line
Under the hood, the utility would take care of fetching only the necessary bits as the data is consumed, and all the retrying caused by connection timeouts.
Out of scope
- Staging the data into temporary storage (Lambda's Ephemeral storage, EFS, etc), before processing: can be added later if there's interest.
- Processing parallelization: it will increase the complexity of the solution and it can be added later.
Future work
Once the basics are in place, we could optimize even further the IO under the hood, by implementing read-head, async and parallel IO, etc. This could result in improved performance on future versions of Powertools, with no breaking API change.
Potential challenges
Implementation is not trivial, so we need a good set of tests, including E2E tests.
Dependencies and Integrations
No response
Alternative solutions
When fetching an S3 object with boto3, there's a private accessor to the underlying urllib3.response.HTTPResponse
. This could be used to stream the results, and could be enough if we're not implementing seek
.
s3_obj['Body']._raw_stream
<urllib3.response.HTTPResponse object at 0x1040e8460>
Acknowledgment
- This feature request meets Lambda Powertools Tenets
- Should this be considered in other Lambda Powertools languages? i.e. Java, TypeScript