Skip to content

Feature request: KinesisDataStreamEnvelope().parse() method seems to be missing data decompression step #6625

@Artur-T-Malas

Description

@Artur-T-Malas

Expected Behaviour

When provided with an event and a Pydantic model, the envelope's parse() method should correctly parse the data, including decompression of it (as as far as I am aware, it is compressed by the Kinesis itself).

Current Behaviour

My Lambda function receives CloudWatch Logs via a Kinesis Data Stream. When using event parser with the KinesisDataStreamEnvelope either as a annotation above lambda_handler or explicitly calling envelope's parse method, I keep getting a UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte.

After debugging it appears as if there is a decompression step missing in the envelope's parse method between casting to bytes and decoding using 'utf-8'.

Code snippet

# Pydantic models for CloudWatch Logs representation
class CloudWatchLogEvent(BaseModel):
  model_config = ConfigDict(alias_generator=to_camel)

 id: str
 timestamp: str | int
 message: str


class CloudWatchLog(BaseModel):
  model_config = ConfigDict(alias_generator=to_camel)

  message_type: str
  owner: str = Field(pattern=r"^\d{12}$")  # 12 digits pattern
  log_group: str
  log_stream: str
  subscription_filter: list[str]
  log_events: list[CloudWatchLogEvent] = Field(min_length=1)


# Version 1 - not working
@event_parser(model=CloudWatchLog, envelope=envelopes.KinesisDataStreamEnvelope)
def lambda_handler(event: list[CloudWatchLog], context: LambdaContext) -> dict:
  ...


# Version 2 - also not working
def lambda_handler(event: dict, context: LambdaContext) -> dict:
log: list[MyLogModel] = envelopes.KinesisDataStreamEnvelope().parse(
  event, CloudWatchLog
)
...

Possible Solution

When manually parsing Kinesis Data Stream input in a Lambda function and locally (using a saved event), the following code works:

record_data: bytes = base64.b64decode(record['kinesis']['data'])
decompressed_data: bytes = gzip.decompress(record_data)
decoded_data: str = record_data_decode('utf-8')

Fixing the envelope

To fix the envelope itself, importing gzip and adding a line:
data = gzip.decompress(data)
in utilities/parser/envelopes/kinesis.py file between casting to bytes and decoding using 'utf-8' fixes the issue.

Since this could be an edge case, it should also be fine to wrap the models.append(self._parse(data=data.decode('utf-8')... line in a try/except clause catching the UnicodeDecodeError exception and then performing the decompression before trying again to decode using 'utf-8'.

Steps to Reproduce

  1. Have a Kinesis Data Stream moving CloudWatch Logs to a Lambda function (or have a saved event and use it locally)
  2. Use the CloudWatchLog Pydantic model provided in the code snippet
  3. Use the KinesisDataStream envelope with an event parser either as an annotation above the Lambda handler, or explicitly call the parse method

Thank you

Powertools for AWS Lambda (Python) version

3.9.0

AWS Lambda function runtime

3.12

Packaging format used

PyPi

Debugging logs

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte

Metadata

Metadata

Assignees

Labels

Projects

Status

Coming soon

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions