Skip to content

Commit 3add524

Browse files
author
Michael Brewer
committed
feat(data-classes): Amazon MQ as an event source for AWS Lambda
1 parent 7a7ba0a commit 3add524

File tree

4 files changed

+193
-0
lines changed

4 files changed

+193
-0
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import base64
2+
import json
3+
from typing import Any, Iterator
4+
5+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
6+
7+
8+
class ActiveMQMessage(DictWrapper):
9+
@property
10+
def message_id(self) -> str:
11+
return self["messageID"]
12+
13+
@property
14+
def message_type(self) -> str:
15+
return self["messageType"]
16+
17+
@property
18+
def data(self) -> str:
19+
return self["data"]
20+
21+
@property
22+
def decoded_data(self) -> str:
23+
"""Decodes the data as a str"""
24+
return base64.b64decode(self.data.encode()).decode()
25+
26+
@property
27+
def json_data(self) -> Any:
28+
"""Parses the data as json"""
29+
return json.loads(self.decoded_data)
30+
31+
@property
32+
def connection_id(self) -> str:
33+
return self["connectionId"]
34+
35+
@property
36+
def redelivered(self) -> bool:
37+
return self["redelivered"]
38+
39+
@property
40+
def timestamp(self) -> int:
41+
return self["timestamp"]
42+
43+
@property
44+
def broker_in_time(self) -> int:
45+
return self["brokerInTime"]
46+
47+
48+
class ActiveMQEvent(DictWrapper):
49+
"""Represents an Active MQ event sent to Lambda
50+
51+
Documentation:
52+
--------------
53+
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
54+
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
55+
"""
56+
57+
@property
58+
def event_source(self) -> str:
59+
return self["eventSource"]
60+
61+
@property
62+
def event_source_arn(self) -> str:
63+
return self["eventSourceArn"]
64+
65+
@property
66+
def messages(self) -> Iterator[ActiveMQMessage]:
67+
for record in self["messages"]:
68+
yield ActiveMQMessage(record)
69+
70+
@property
71+
def message(self) -> ActiveMQMessage:
72+
return next(self.messages)

tests/events/activeMQEvent.json

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
{
2+
"eventSource": "aws:amq",
3+
"eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
4+
"messages": [
5+
{
6+
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
7+
"messageType": "jms/text-message",
8+
"data": "QUJDOkFBQUE=",
9+
"connectionId": "myJMSCoID",
10+
"redelivered": false,
11+
"destination": {
12+
"physicalname": "testQueue"
13+
},
14+
"timestamp": 1598827811958,
15+
"brokerInTime": 1598827811958,
16+
"brokerOutTime": 1598827811959
17+
},
18+
{
19+
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
20+
"messageType": "jms/text-message",
21+
"data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==",
22+
"connectionId": "myJMSCoID2",
23+
"redelivered": false,
24+
"destination": {
25+
"physicalname": "testQueue"
26+
},
27+
"timestamp": 1598827811958,
28+
"brokerInTime": 1598827811958,
29+
"brokerOutTime": 1598827811959
30+
},
31+
{
32+
"messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
33+
"messageType": "jms/bytes-message",
34+
"data": "3DTOOW7crj51prgVLQaGQ82S48k=",
35+
"connectionId": "myJMSCoID1",
36+
"persistent": false,
37+
"destination": {
38+
"physicalname": "testQueue"
39+
},
40+
"timestamp": 1598827811958,
41+
"brokerInTime": 1598827811958,
42+
"brokerOutTime": 1598827811959
43+
}
44+
]
45+
}

tests/events/rabbitMQEvent.json

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"eventSource": "aws:rmq",
3+
"eventSourceArn": "arn:aws:mq:us-west-2:112556298976:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8",
4+
"rmqMessagesByQueue": {
5+
"pizzaQueue::/": [
6+
{
7+
"basicProperties": {
8+
"contentType": "text/plain",
9+
"contentEncoding": null,
10+
"headers": {
11+
"header1": {
12+
"bytes": [
13+
118,
14+
97,
15+
108,
16+
117,
17+
101,
18+
49
19+
]
20+
},
21+
"header2": {
22+
"bytes": [
23+
118,
24+
97,
25+
108,
26+
117,
27+
101,
28+
50
29+
]
30+
},
31+
"numberInHeader": 10
32+
},
33+
"deliveryMode": 1,
34+
"priority": 34,
35+
"correlationId": null,
36+
"replyTo": null,
37+
"expiration": "60000",
38+
"messageId": null,
39+
"timestamp": "Jan 1, 1970, 12:33:41 AM",
40+
"type": null,
41+
"userId": "AIDACKCEVSQ6C2EXAMPLE",
42+
"appId": null,
43+
"clusterId": null,
44+
"bodySize": 80
45+
},
46+
"redelivered": false,
47+
"data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ=="
48+
}
49+
]
50+
}
51+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from aws_lambda_powertools.utilities.data_classes.active_mq import ActiveMQEvent, ActiveMQMessage
2+
from tests.functional.utils import load_event
3+
4+
5+
def test_cloud_watch_trigger_event():
6+
event = ActiveMQEvent(load_event("activeMQEvent.json"))
7+
8+
assert event.event_source == "aws:amq"
9+
assert event.event_source_arn is not None
10+
assert len(list(event.messages)) == 3
11+
12+
message = event.message
13+
assert isinstance(message, ActiveMQMessage)
14+
assert message.message_id is not None
15+
assert message.message_type is not None
16+
assert message.data is not None
17+
assert message.decoded_data is not None
18+
assert message.connection_id is not None
19+
assert message.redelivered is False
20+
assert message.timestamp is not None
21+
assert message.broker_in_time is not None
22+
23+
messages = list(event.messages)
24+
message = messages[1]
25+
assert message.json_data["timeout"] == 0

0 commit comments

Comments
 (0)