Skip to content

feat(parser): add support to decompress Kinesis CloudWatch logs in Kinesis envelope #6656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 15, 2025
18 changes: 17 additions & 1 deletion aws_lambda_powertools/utilities/parser/envelopes/kinesis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import zlib
from typing import TYPE_CHECKING, Any, cast

from aws_lambda_powertools.utilities.parser.envelopes.base import BaseEnvelope
Expand Down Expand Up @@ -45,5 +46,20 @@
for record in parsed_envelope.Records:
# We allow either AWS expected contract (bytes) or a custom Model, see #943
data = cast(bytes, record.kinesis.data)
models.append(self._parse(data=data.decode("utf-8"), model=model))
try:
decoded_data = data.decode("utf-8")
# If the Data Stream contains compressed data eg. CloudWatch Logs
# `decode` method will throw a UnicodeDecodeError
# which signals that decompression might be required
except UnicodeDecodeError as ude:
try:
logger.debug(
f"{type(ude).__name__}: {str(ude)} encountered. "
"Data will be decompressed with zlib.decompress()."
)
decompressed_data = zlib.decompress(data, zlib.MAX_WBITS | 32)
decoded_data = decompressed_data.decode("utf-8")
except Exception:
raise ValueError("Unable to decode and/or decompress data.")

Check warning on line 63 in aws_lambda_powertools/utilities/parser/envelopes/kinesis.py

View check run for this annotation

Codecov / codecov/patch

aws_lambda_powertools/utilities/parser/envelopes/kinesis.py#L62-L63

Added lines #L62 - L63 were not covered by tests
models.append(self._parse(data=decoded_data, model=model))
return models
26 changes: 26 additions & 0 deletions tests/unit/parser/_pydantic/test_kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,29 @@ class DummyModel(BaseModel): ...
for record in stream_data.Records:
record.kinesis.data = DummyModel()
record.decompress_zlib_record_data_as_json()


def test_kinesis_stream_event_with_cloud_watch_logs_data_using_envelope():
# GIVEN Kinesis Data Stream event with compressed data
# such as CloudWatch Logs
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

# WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
logs = envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)

# THEN logs should be extracted as CloudWatchLogsDecode objects
assert isinstance(logs[0], CloudWatchLogsDecode)


def test_kinesis_stream_event_with_cloud_watch_logs_data_fails_using_envelope():
# GIVEN Kinesis Data Stream event with corrupted compressed data
# such as CloudWatch Logs
raw_event = load_event("kinesisStreamCloudWatchLogsEvent.json")

# WHEN parsing using KinesisDataStreamEvelope to CloudWatchLogsDecode
# and the data is corrupted
raw_event["Records"][0]["kinesis"]["data"] = "123"

# THEN a ValueError should be thrown
with pytest.raises(ValueError):
envelopes.KinesisDataStreamEnvelope().parse(raw_event, CloudWatchLogsDecode)
Loading