Skip to content

Commit 28239ad

Browse files
authored
feat(event_sources): extract CloudWatch Logs in Kinesis streams (#1710)
1 parent 7978eea commit 28239ad

File tree

4 files changed

+155
-43
lines changed

4 files changed

+155
-43
lines changed

aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import base64
22
import json
3-
from typing import Iterator
3+
import zlib
4+
from typing import Iterator, List
45

6+
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
7+
CloudWatchLogsDecodedData,
8+
)
59
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
610

711

@@ -43,6 +47,11 @@ def data_as_json(self) -> dict:
4347
"""Decode binary encoded data as json"""
4448
return json.loads(self.data_as_text())
4549

50+
def data_zlib_compressed_as_json(self) -> dict:
51+
"""Decode binary encoded data as bytes"""
52+
decompressed = zlib.decompress(self.data_as_bytes(), zlib.MAX_WBITS | 32)
53+
return json.loads(decompressed)
54+
4655

4756
class KinesisStreamRecord(DictWrapper):
4857
@property
@@ -98,3 +107,11 @@ class KinesisStreamEvent(DictWrapper):
98107
def records(self) -> Iterator[KinesisStreamRecord]:
99108
for record in self["Records"]:
100109
yield KinesisStreamRecord(record)
110+
111+
112+
def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> List[CloudWatchLogsDecodedData]:
113+
return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]
114+
115+
116+
def extract_cloudwatch_logs_from_record(record: KinesisStreamRecord) -> CloudWatchLogsDecodedData:
117+
return CloudWatchLogsDecodedData(data=record.kinesis.data_zlib_compressed_as_json())

docs/utilities/data_classes.md

Lines changed: 89 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -58,33 +58,33 @@ Same example as above, but using the `event_source` decorator
5858

5959
## Supported event sources
6060

61-
Event Source | Data_class
62-
------------------------------------------------- | ---------------------------------------------------------------------------------
63-
[Active MQ](#active-mq) | `ActiveMQEvent`
64-
[API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent`
65-
[API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2`
66-
[API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent`
67-
[API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2`
68-
[Application Load Balancer](#application-load-balancer) | `ALBEvent`
69-
[AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent`
70-
[AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent`
71-
[CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent`
72-
[CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent`
73-
[CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent`
74-
[Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event`
75-
[Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent`
76-
[DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName`
77-
[EventBridge](#eventbridge) | `EventBridgeEvent`
78-
[Kafka](#kafka) | `KafkaEvent`
79-
[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent`
80-
[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent`
81-
[Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent`
82-
[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent`
83-
[S3](#s3) | `S3Event`
84-
[S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent`
85-
[SES](#ses) | `SESEvent`
86-
[SNS](#sns) | `SNSEvent`
87-
[SQS](#sqs) | `SQSEvent`
61+
| Event Source | Data_class |
62+
| ------------------------------------------------------------------------- | -------------------------------------------------- |
63+
| [Active MQ](#active-mq) | `ActiveMQEvent` |
64+
| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` |
65+
| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` |
66+
| [API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` |
67+
| [API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2` |
68+
| [Application Load Balancer](#application-load-balancer) | `ALBEvent` |
69+
| [AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent` |
70+
| [AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` |
71+
| [CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent` |
72+
| [CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent` |
73+
| [CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent` |
74+
| [Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event` |
75+
| [Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` |
76+
| [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` |
77+
| [EventBridge](#eventbridge) | `EventBridgeEvent` |
78+
| [Kafka](#kafka) | `KafkaEvent` |
79+
| [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` |
80+
| [Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` |
81+
| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` |
82+
| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` |
83+
| [S3](#s3) | `S3Event` |
84+
| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` |
85+
| [SES](#ses) | `SESEvent` |
86+
| [SNS](#sns) | `SNSEvent` |
87+
| [SQS](#sqs) | `SQSEvent` |
8888

8989
???+ info
9090
The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of
@@ -456,9 +456,9 @@ In this example, we also use the new Logger `correlation_id` and built-in `corre
456456
A simple echo script. Anything passed in \`\`\`echo\`\`\` parameter is returned as the content of custom widget.
457457

458458
### Widget parameters
459-
Param | Description
460-
---|---
461-
**echo** | The content to echo back
459+
| Param | Description |
460+
| -------- | ------------------------ |
461+
| **echo** | The content to echo back |
462462

463463
### Example parameters
464464
\`\`\` yaml
@@ -497,6 +497,53 @@ decompress and parse json data from the event.
497497
do_something_with(event.timestamp, event.message)
498498
```
499499

500+
#### Kinesis integration
501+
502+
[When streaming CloudWatch Logs to a Kinesis Data Stream](https://aws.amazon.com/premiumsupport/knowledge-center/streaming-cloudwatch-logs/){target="_blank"} (cross-account or not), you can use `extract_cloudwatch_logs_from_event` to decode, decompress and extract logs as `CloudWatchLogsDecodedData` to ease log processing.
503+
504+
=== "app.py"
505+
506+
```python hl_lines="5-6 11"
507+
from typing import List
508+
509+
from aws_lambda_powertools.utilities.data_classes import event_source
510+
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
511+
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
512+
KinesisStreamEvent, extract_cloudwatch_logs_from_event)
513+
514+
515+
@event_source(data_class=KinesisStreamEvent)
516+
def simple_handler(event: KinesisStreamEvent, context):
517+
logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event)
518+
for log in logs:
519+
if log.message_type == "DATA_MESSAGE":
520+
return "success"
521+
return "nothing to be processed"
522+
```
523+
524+
Alternatively, you can use `extract_cloudwatch_logs_from_record` to seamless integrate with the [Batch utility](./batch.md) for more robust log processing.
525+
526+
=== "app.py"
527+
528+
```python hl_lines="3-4 10"
529+
from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType,
530+
batch_processor)
531+
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
532+
KinesisStreamRecord, extract_cloudwatch_logs_from_record)
533+
534+
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
535+
536+
537+
def record_handler(record: KinesisStreamRecord):
538+
log = extract_cloudwatch_logs_from_record(record)
539+
return log.message_type == "DATA_MESSAGE"
540+
541+
542+
@batch_processor(record_handler=record_handler, processor=processor)
543+
def lambda_handler(event, context):
544+
return processor.response()
545+
```
546+
500547
### CodePipeline Job
501548

502549
Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda
@@ -553,18 +600,18 @@ Data classes and utility functions to help create continuous delivery pipelines
553600
Cognito User Pools have several [different Lambda trigger sources](https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-user-identity-pools-working-with-aws-lambda-triggers.html#cognito-user-identity-pools-working-with-aws-lambda-trigger-sources), all of which map to a different data class, which
554601
can be imported from `aws_lambda_powertools.data_classes.cognito_user_pool_event`:
555602

556-
Trigger/Event Source | Data Class
557-
------------------------------------------------- | -------------------------------------------------
558-
Custom message event | `data_classes.cognito_user_pool_event.CustomMessageTriggerEvent`
559-
Post authentication | `data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent`
560-
Post confirmation | `data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent`
561-
Pre authentication | `data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent`
562-
Pre sign-up | `data_classes.cognito_user_pool_event.PreSignUpTriggerEvent`
563-
Pre token generation | `data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent`
564-
User migration | `data_classes.cognito_user_pool_event.UserMigrationTriggerEvent`
565-
Define Auth Challenge | `data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent`
566-
Create Auth Challenge | `data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent`
567-
Verify Auth Challenge | `data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent`
603+
| Trigger/Event Source | Data Class |
604+
| --------------------- | ------------------------------------------------------------------------------ |
605+
| Custom message event | `data_classes.cognito_user_pool_event.CustomMessageTriggerEvent` |
606+
| Post authentication | `data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent` |
607+
| Post confirmation | `data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent` |
608+
| Pre authentication | `data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent` |
609+
| Pre sign-up | `data_classes.cognito_user_pool_event.PreSignUpTriggerEvent` |
610+
| Pre token generation | `data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent` |
611+
| User migration | `data_classes.cognito_user_pool_event.UserMigrationTriggerEvent` |
612+
| Define Auth Challenge | `data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent` |
613+
| Create Auth Challenge | `data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent` |
614+
| Verify Auth Challenge | `data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent` |
568615

569616
#### Post Confirmation Example
570617

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"Records": [
3+
{
4+
"kinesis": {
5+
"kinesisSchemaVersion": "1.0",
6+
"partitionKey": "da10bf66b1f54bff5d96eae99149ad1f",
7+
"sequenceNumber": "49635052289529725553291405521504870233219489715332317186",
8+
"data": "H4sIAAAAAAAAAK2Sa2vbMBSG/4ox+xg3Oror39IlvaztVmJv7WjCUGwl8+ZLZstts5L/vuOsZYUyWGEgJHiP9J7nvOghLF3b2rVLthsXjsLJOBl/uZjG8fh4Gg7C+q5yDcqUAWcSONHEoFzU6+Om7jZYGdq7dljYcpnZ4cZHwLWOJl1Zbs/r9cR6e9RVqc/rKlpXV9eXt+fy27vt8W+L2DfOlr07oXQIMAQyvHlzPk6mcbKgciktF5lQfMU5dZZqzrShLF2uFC60aLtlmzb5prc/ygvvmjYc3YRPFG+LusuurE+/Ikqb1Gd55dq8jV+8isT6+317Rk42J5PTcLFnm966yvd2D2GeISJTYIwCJSQ1BE9OtWZCABWaKMIJAMdDMyU5MYZLhmkxBhQxfY4Re1tiWiAlBsgIVQTE4Cl6tI+T8SwJZu5Hh1dPs1FApOMSDI9WVKmIC+4irTMWQZYpx7QkztrgE06MU4yCx9DmVbgbvABmQJTGtkYAB0NwEwyYQUBpqEFuSbkGrThTRKi/AlP+HHj6fvJa3P9Ap/+Rbja9/PD6POd+0jXW7xM1B8CDsp37w7woXBb8qQDZ6xeurJttEOc/HWpUBxeHKNr74LHwsXXYlsm9flrl/rmFIQeS7m3m1fVs/DlIGpu6nhMiyWQGXNKIMbcCIgkhElKbaZnZpYJUz33s1iV+z/6+StMlR3yphHNcCyxiNEXf2zed6xuEu8XuF2wb6krnAwAA",
9+
"approximateArrivalTimestamp": 1668093033.744
10+
},
11+
"eventSource": "aws:kinesis",
12+
"eventVersion": "1.0",
13+
"eventID": "shardId-000000000000:49635052289529725553291405521504870233219489715332317186",
14+
"eventName": "aws:kinesis:record",
15+
"invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
16+
"awsRegion": "eu-west-1",
17+
"eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG"
18+
},
19+
{
20+
"kinesis": {
21+
"kinesisSchemaVersion": "1.0",
22+
"partitionKey": "cf4c4c2c9a49bdfaf58d7dbbc2b06081",
23+
"sequenceNumber": "49635052289529725553291405520881064510298312199003701250",
24+
"data": "H4sIAAAAAAAAAK2SW2/TQBCF/4pl8ViTvc7u5i0laVraQhUbWtREaG1PgsGXYK/bhqr/nXVoBRIgUYnXc2bPfHO092GFXWc3mOy2GI7D6SSZfDyfxfFkPgsPwua2xtbLjFPBgQqiifFy2WzmbdNvvTOyt92otFWa29HWRVRoHU37qtqdNZupdfaorzNXNHW0qS+vLm7O4PPr3fxHROxatNWQThgbUTqiZHT94mySzOJkBUqYLOWY8ZQLbaTRkEvDciUYzWzKfETXp13WFtsh/qgoHbZdOL4OnyhelU2fX1qXffIoXdKcFjV2RRf/9iqSmy933Sk53h5PT8LVnm12g7Ub4u7DIveIXFFjFNGUKUlAaMY0EUJKLjkQbxhKGCWeknMKoAGUkYoJ7TFd4St2tvJtDRYxDAg3VB08Ve/j42SySIIFfu396Ek+DkS+xkwAiYhM00isgUV6jXmEMrM5EmMsh+C9v9hfMQ4eS1vW4cPBH4CZVpoTJkEIAp5RUMo8vGFae3JNCCdUccMVgPw7sP4VePZm+lzc/0AH/0i3mF28fX6fSzftW+v2jZKXRgVVt3SHRVliHvx06F4+x6ppd0FcfEMvMR2cH3rR3gWPxrsO/Vau9vqyvlpMPgRJazMcYGgEHHLKBhLGJaBA0JLxNc0JppoS9Cwxbir/B4d5QDBAQSnfFFGp8aa/vxw2uLbHYUH4sHr4Dj5RJxfMAwAA",
25+
"approximateArrivalTimestamp": 1668092612.992
26+
},
27+
"eventSource": "aws:kinesis",
28+
"eventVersion": "1.0",
29+
"eventID": "shardId-000000000000:49635052289529725553291405520881064510298312199003701250",
30+
"eventName": "aws:kinesis:record",
31+
"invokeIdentityArn": "arn:aws:iam::231436140809:role/pt-1488-CloudWatchKinesisLogsFunctionRole-1M4G2TIWIE49",
32+
"awsRegion": "eu-west-1",
33+
"eventSourceARN": "arn:aws:kinesis:eu-west-1:231436140809:stream/pt-1488-KinesisStreamCloudWatchLogs-D8tHs0im0aJG"
34+
}
35+
]
36+
}

tests/functional/test_data_classes.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@
8383
StreamViewType,
8484
)
8585
from aws_lambda_powertools.utilities.data_classes.event_source import event_source
86+
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
87+
extract_cloudwatch_logs_from_event,
88+
extract_cloudwatch_logs_from_record,
89+
)
8690
from aws_lambda_powertools.utilities.data_classes.s3_object_event import (
8791
S3ObjectLambdaEvent,
8892
)
@@ -1267,6 +1271,14 @@ def test_kinesis_stream_event_json_data():
12671271
assert record.kinesis.data_as_json() == json_value
12681272

12691273

1274+
def test_kinesis_stream_event_cloudwatch_logs_data_extraction():
1275+
event = KinesisStreamEvent(load_event("kinesisStreamCloudWatchLogsEvent.json"))
1276+
extracted_logs = extract_cloudwatch_logs_from_event(event)
1277+
individual_logs = [extract_cloudwatch_logs_from_record(record) for record in event.records]
1278+
1279+
assert len(extracted_logs) == len(individual_logs)
1280+
1281+
12701282
def test_alb_event():
12711283
event = ALBEvent(load_event("albEvent.json"))
12721284
assert event.request_context.elb_target_group_arn == event["requestContext"]["elb"]["targetGroupArn"]

0 commit comments

Comments
 (0)