Description
Expected Behaviour
When provided with an event and a Pydantic model, the envelope's parse()
method should correctly parse the data, including decompression of it (as as far as I am aware, it is compressed by the Kinesis itself).
Current Behaviour
My Lambda function receives CloudWatch Logs via a Kinesis Data Stream. When using event parser with the KinesisDataStreamEnvelope
either as a annotation above lambda_handler
or explicitly calling envelope's parse
method, I keep getting a UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
.
After debugging it appears as if there is a decompression step missing in the envelope's parse
method between casting to bytes and decoding using 'utf-8'.
Code snippet
# Pydantic models for CloudWatch Logs representation
class CloudWatchLogEvent(BaseModel):
model_config = ConfigDict(alias_generator=to_camel)
id: str
timestamp: str | int
message: str
class CloudWatchLog(BaseModel):
model_config = ConfigDict(alias_generator=to_camel)
message_type: str
owner: str = Field(pattern=r"^\d{12}$") # 12 digits pattern
log_group: str
log_stream: str
subscription_filter: list[str]
log_events: list[CloudWatchLogEvent] = Field(min_length=1)
# Version 1 - not working
@event_parser(model=CloudWatchLog, envelope=envelopes.KinesisDataStreamEnvelope)
def lambda_handler(event: list[CloudWatchLog], context: LambdaContext) -> dict:
...
# Version 2 - also not working
def lambda_handler(event: dict, context: LambdaContext) -> dict:
log: list[MyLogModel] = envelopes.KinesisDataStreamEnvelope().parse(
event, CloudWatchLog
)
...
Possible Solution
When manually parsing Kinesis Data Stream input in a Lambda function and locally (using a saved event), the following code works:
record_data: bytes = base64.b64decode(record['kinesis']['data'])
decompressed_data: bytes = gzip.decompress(record_data)
decoded_data: str = record_data_decode('utf-8')
Fixing the envelope
To fix the envelope itself, importing gzip
and adding a line:
data = gzip.decompress(data)
in utilities/parser/envelopes/kinesis.py file between casting to bytes and decoding using 'utf-8' fixes the issue.
Since this could be an edge case, it should also be fine to wrap the models.append(self._parse(data=data.decode('utf-8')...
line in a try
/except
clause catching the UnicodeDecodeError
exception and then performing the decompression before trying again to decode using 'utf-8'.
Steps to Reproduce
- Have a Kinesis Data Stream moving CloudWatch Logs to a Lambda function (or have a saved event and use it locally)
- Use the CloudWatchLog Pydantic model provided in the code snippet
- Use the KinesisDataStream envelope with an event parser either as an annotation above the Lambda handler, or explicitly call the
parse
method
Thank you
Powertools for AWS Lambda (Python) version
3.9.0
AWS Lambda function runtime
3.12
Packaging format used
PyPi
Debugging logs
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
Metadata
Metadata
Assignees
Labels
Type
Projects
Status