Skip to content

Commit 7af50e5

Browse files
feat(streaming): add new s3 streaming utility (#1719)
Co-authored-by: heitorlessa <lessa@amazon.co.uk>
1 parent 539375d commit 7af50e5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2107
-102
lines changed

.github/boring-cyborg.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ labelPRBasedOnFilePath:
4242
typing:
4343
- aws_lambda_powertools/utilities/typing/*
4444
- mypy.ini
45+
streaming:
46+
- aws_lambda_powertools/utilities/streaming/*
4547
commons:
4648
- aws_lambda_powertools/shared/*
4749

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ A suite of Python utilities for AWS Lambda functions to ease adopting best pract
1515
1616
![hero-image](https://user-images.githubusercontent.com/3340292/198254617-d0fdb672-86a6-4988-8a40-adf437135e0a.png)
1717

18-
1918
## Features
2019

2120
* **[Tracing](https://awslabs.github.io/aws-lambda-powertools-python/latest/core/tracer/)** - Decorators and utilities to trace Lambda function handlers, and both synchronous and asynchronous functions
@@ -32,6 +31,7 @@ A suite of Python utilities for AWS Lambda functions to ease adopting best pract
3231
* **[Parser](https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/parser/)** - Data parsing and deep validation using Pydantic
3332
* **[Idempotency](https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/idempotency/)** - Convert your Lambda functions into idempotent operations which are safe to retry
3433
* **[Feature Flags](https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/feature_flags/)** - A simple rule engine to evaluate when one or multiple features should be enabled depending on the input
34+
* **[Streaming](https://awslabs.github.io/aws-lambda-powertools-python/latest/utilities/streaming/)** - Streams datasets larger than the available memory as streaming data.
3535

3636
### Installation
3737

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from aws_lambda_powertools.utilities.streaming.s3_object import S3Object
2+
3+
__all__ = ["S3Object"]
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import io
2+
import logging
3+
from typing import IO, TYPE_CHECKING, AnyStr, Iterable, List, Optional
4+
5+
import boto3
6+
7+
from aws_lambda_powertools.utilities.streaming.compat import PowertoolsStreamingBody
8+
9+
if TYPE_CHECKING:
10+
from mypy_boto3_s3 import Client
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class _S3SeekableIO(IO[bytes]):
16+
"""
17+
_S3SeekableIO wraps boto3.StreamingBody to allow for seeking. Seeking is achieved by closing the
18+
existing connection and re-opening a new one, passing the correct HTTP Range header.
19+
20+
Parameters
21+
----------
22+
bucket: str
23+
The S3 bucket
24+
key: str
25+
The S3 key
26+
version_id: str, optional
27+
A version ID of the object, when the S3 bucket is versioned
28+
boto3_client: boto3 S3 Client, optional
29+
An optional boto3 S3 client. If missing, a new one will be created.
30+
sdk_options: dict, optional
31+
Dictionary of options that will be passed to the S3 Client get_object API call
32+
"""
33+
34+
def __init__(
35+
self, bucket: str, key: str, version_id: Optional[str] = None, boto3_client=Optional["Client"], **sdk_options
36+
):
37+
self.bucket = bucket
38+
self.key = key
39+
40+
# Holds the current position in the stream
41+
self._position = 0
42+
43+
# Stores the closed state of the stream
44+
self._closed: bool = False
45+
46+
# Caches the size of the object
47+
self._size: Optional[int] = None
48+
49+
self._s3_client: Optional["Client"] = boto3_client
50+
self._raw_stream: Optional[PowertoolsStreamingBody] = None
51+
52+
self._sdk_options = sdk_options
53+
self._sdk_options["Bucket"] = bucket
54+
self._sdk_options["Key"] = key
55+
if version_id is not None:
56+
self._sdk_options["VersionId"] = version_id
57+
58+
@property
59+
def s3_client(self) -> "Client":
60+
"""
61+
Returns a boto3 S3 client
62+
"""
63+
if self._s3_client is None:
64+
self._s3_client = boto3.client("s3")
65+
return self._s3_client
66+
67+
@property
68+
def size(self) -> int:
69+
"""
70+
Retrieves the size of the S3 object
71+
"""
72+
if self._size is None:
73+
logger.debug("Getting size of S3 object")
74+
self._size = self.s3_client.head_object(**self._sdk_options).get("ContentLength", 0)
75+
return self._size
76+
77+
@property
78+
def raw_stream(self) -> PowertoolsStreamingBody:
79+
"""
80+
Returns the boto3 StreamingBody, starting the stream from the seeked position.
81+
"""
82+
if self._raw_stream is None:
83+
range_header = f"bytes={self._position}-"
84+
logger.debug(f"Starting new stream at {range_header}")
85+
self._raw_stream = self.s3_client.get_object(Range=range_header, **self._sdk_options).get("Body")
86+
self._closed = False
87+
88+
return self._raw_stream
89+
90+
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
91+
"""
92+
Seeks the current object, invalidating the underlying stream if the position changes.
93+
"""
94+
current_position = self._position
95+
96+
if whence == io.SEEK_SET:
97+
self._position = offset
98+
elif whence == io.SEEK_CUR:
99+
self._position += offset
100+
elif whence == io.SEEK_END:
101+
self._position = self.size + offset
102+
else:
103+
raise ValueError(f"invalid whence ({whence}, should be {io.SEEK_SET}, {io.SEEK_CUR}, {io.SEEK_END})")
104+
105+
# Invalidate the existing stream, so a new one will be open on the next IO operation.
106+
#
107+
# Some consumers of this class might call seek multiple times, without affecting the net position.
108+
# zipfile.ZipFile does this often. If we just blindly invalidated the stream, we would have to re-open
109+
# an S3 HTTP connection just to continue reading on the same position as before, which would be inefficient.
110+
#
111+
# So we only invalidate it if there's a net position change after seeking, and we have an existing S3 connection
112+
if current_position != self._position and self._raw_stream is not None:
113+
self._raw_stream.close()
114+
self._raw_stream = None
115+
116+
return self._position
117+
118+
def seekable(self) -> bool:
119+
return True
120+
121+
def readable(self) -> bool:
122+
return True
123+
124+
def writable(self) -> bool:
125+
return False
126+
127+
def tell(self) -> int:
128+
return self._position
129+
130+
def read(self, size: Optional[int] = -1) -> bytes:
131+
size = None if size == -1 else size
132+
data = self.raw_stream.read(size)
133+
if data is not None:
134+
self._position += len(data)
135+
return data
136+
137+
def readline(self, size: Optional[int] = None) -> bytes:
138+
data = self.raw_stream.readline(size)
139+
self._position += len(data)
140+
return data
141+
142+
def readlines(self, hint: int = -1) -> List[bytes]:
143+
# boto3's StreamingResponse doesn't implement the "hint" parameter
144+
data = self.raw_stream.readlines()
145+
self._position += sum(len(line) for line in data)
146+
return data
147+
148+
@property
149+
def closed(self) -> bool:
150+
return self._closed
151+
152+
def __next__(self):
153+
return self.raw_stream.__next__()
154+
155+
def __iter__(self):
156+
return self.raw_stream.__iter__()
157+
158+
def __enter__(self):
159+
return self
160+
161+
def __exit__(self, *kwargs):
162+
self.close()
163+
164+
def close(self) -> None:
165+
self.raw_stream.close()
166+
self._closed = True
167+
168+
def fileno(self) -> int:
169+
raise NotImplementedError("this stream is not backed by a file descriptor")
170+
171+
def flush(self) -> None:
172+
raise NotImplementedError("this stream is not writable")
173+
174+
def isatty(self) -> bool:
175+
return False
176+
177+
def truncate(self, size: Optional[int] = 0) -> int:
178+
raise NotImplementedError("this stream is not writable")
179+
180+
def write(self, data: AnyStr) -> int:
181+
raise NotImplementedError("this stream is not writable")
182+
183+
def writelines(self, lines: Iterable[AnyStr]) -> None:
184+
raise NotImplementedError("this stream is not writable")

0 commit comments

Comments
 (0)