Skip to content

Commit c273e5d

Browse files
committed
add md5 logic and logs
1 parent 30d635d commit c273e5d

File tree

7 files changed

+222
-17
lines changed

7 files changed

+222
-17
lines changed

powertools-large-messages/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@
5454
<groupId>software.amazon.payloadoffloading</groupId>
5555
<artifactId>payloadoffloading-common</artifactId>
5656
</dependency>
57+
<dependency>
58+
<groupId>software.amazon.awssdk</groupId>
59+
<artifactId>sdk-core</artifactId>
60+
</dependency>
61+
<dependency>
62+
<groupId>software.amazon.awssdk</groupId>
63+
<artifactId>utils</artifactId>
64+
</dependency>
5765
<dependency>
5866
<groupId>software.amazon.awssdk</groupId>
5967
<artifactId>s3</artifactId>

powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
package software.amazon.lambda.powertools.largemessages;
22

3-
import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
43
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
54
import software.amazon.awssdk.regions.Region;
65
import software.amazon.awssdk.services.s3.S3Client;
76
import software.amazon.awssdk.services.s3.S3ClientBuilder;
87

9-
import static software.amazon.lambda.powertools.core.internal.LambdaConstants.AWS_LAMBDA_INITIALIZATION_TYPE;
108
import static software.amazon.lambda.powertools.core.internal.LambdaConstants.AWS_REGION_ENV;
11-
import static software.amazon.lambda.powertools.core.internal.LambdaConstants.ON_DEMAND;
129

1310
/**
1411
* Singleton instance for Large Message Config.
@@ -55,14 +52,6 @@ public S3Client getS3Client() {
5552
S3ClientBuilder s3ClientBuilder = S3Client.builder()
5653
.httpClient(UrlConnectionHttpClient.builder().build())
5754
.region(Region.of(System.getenv(AWS_REGION_ENV)));
58-
59-
// AWS_LAMBDA_INITIALIZATION_TYPE has two values on-demand and snap-start
60-
// when using snap-start mode, the env var creds provider isn't used and causes a fatal error if set
61-
// fall back to the default provider chain if the mode is anything other than on-demand.
62-
String initializationType = System.getenv().get(AWS_LAMBDA_INITIALIZATION_TYPE);
63-
if (initializationType != null && initializationType.equals(ON_DEMAND)) {
64-
s3ClientBuilder.credentialsProvider(EnvironmentVariableCredentialsProvider.create());
65-
}
6655
this.s3Client = s3ClientBuilder.build();
6756
}
6857
return this.s3Client;

powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010

1111
import java.util.Optional;
1212

13-
import static java.lang.String.format;
14-
1513
/**
1614
* Handle {@link LargeMessage} annotations.
1715
*/
@@ -40,7 +38,7 @@ public Object around(ProceedingJoinPoint pjp,
4038
Optional<LargeMessageProcessor<?>> largeMessageProcessor = LargeMessageProcessorFactory.get(message);
4139

4240
if (!largeMessageProcessor.isPresent()) {
43-
LOG.warn(format("@LargeMessage annotation is placed on a method with unsupported message type [%s], proceeding", message.getClass()));
41+
LOG.warn("@LargeMessage annotation is placed on a method with unsupported message type [{}], proceeding", message.getClass());
4442
return pjp.proceed(proceedArgs);
4543
}
4644

powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,33 @@ public Object process(ProceedingJoinPoint pjp, boolean deleteS3Object) throws Th
4343
// legacy attribute (sqs only)
4444
payloadPointer = payloadPointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer");
4545

46+
LOG.info("Large message [{}]: retrieving content from S3", getMessageId(message));
47+
4648
String s3ObjectContent = getS3ObjectContent(payloadPointer);
4749

50+
LOG.debug("Large message [{}]: {}", getMessageId(message), s3ObjectContent);
51+
4852
updateMessageContent(message, s3ObjectContent);
4953
removeLargeMessageAttributes(message);
5054

5155
Object response = pjp.proceed(proceedArgs);
5256

5357
if (deleteS3Object) {
58+
LOG.info("Large message [{}]: deleting object from S3", getMessageId(message));
5459
deleteS3Object(payloadPointer);
5560
}
5661

5762
return response;
5863
}
5964

65+
/**
66+
* Retrieve the message id
67+
*
68+
* @param message the message itself
69+
* @return the id of the message (String format)
70+
*/
71+
protected abstract String getMessageId(T message);
72+
6073
/**
6174
* Retrieve the content of the message (ex: body of an SQSMessage)
6275
*
@@ -75,13 +88,15 @@ public Object process(ProceedingJoinPoint pjp, boolean deleteS3Object) throws Th
7588

7689
/**
7790
* Check if the message is actually a large message (indicator in message attributes)
91+
*
7892
* @param message the message itself
7993
* @return true if the message is a large message
8094
*/
8195
protected abstract boolean isLargeMessage(T message);
8296

8397
/**
8498
* Remove the large message indicator (in message attributes)
99+
*
85100
* @param message the message itself
86101
*/
87102
protected abstract void removeLargeMessageAttributes(T message);

powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeSNSMessageProcessor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ public class LargeSNSMessageProcessor extends LargeMessageProcessor<SNSRecord> {
1111
public LargeSNSMessageProcessor() {
1212
}
1313

14+
@Override
15+
protected String getMessageId(SNSRecord message) {
16+
return message.getSNS().getMessageId();
17+
}
18+
1419
@Override
1520
protected String getMessageContent(SNSRecord message) {
1621
return message.getSNS().getMessage();

powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeSQSMessageProcessor.java

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,39 @@
22

33
import com.amazonaws.services.lambda.runtime.events.SQSEvent.MessageAttribute;
44
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import software.amazon.awssdk.utils.BinaryUtils;
8+
import software.amazon.awssdk.utils.Md5Utils;
59

10+
import java.nio.ByteBuffer;
11+
import java.nio.charset.StandardCharsets;
12+
import java.security.MessageDigest;
13+
import java.util.ArrayList;
14+
import java.util.Collections;
615
import java.util.HashMap;
16+
import java.util.List;
717
import java.util.Map;
18+
import java.util.Optional;
819

920
public class LargeSQSMessageProcessor extends LargeMessageProcessor<SQSMessage> {
1021

11-
protected static final String LEGACY_RESERVED_ATTRIBUTE_NAME = "SQSLargePayloadSize";
22+
private static final Logger LOG = LoggerFactory.getLogger(LargeSQSMessageProcessor.class);
23+
private static final String LEGACY_RESERVED_ATTRIBUTE_NAME = "SQSLargePayloadSize";
24+
private static final int INTEGER_SIZE_IN_BYTES = 4;
25+
private static final byte STRING_TYPE_FIELD_INDEX = 1;
26+
private static final byte BINARY_TYPE_FIELD_INDEX = 2;
27+
private static final byte STRING_LIST_TYPE_FIELD_INDEX = 3;
28+
private static final byte BINARY_LIST_TYPE_FIELD_INDEX = 4;
1229

1330
public LargeSQSMessageProcessor() {
1431
}
1532

33+
@Override
34+
protected String getMessageId(SQSMessage message) {
35+
return message.getMessageId();
36+
}
37+
1638
@Override
1739
protected String getMessageContent(SQSMessage message) {
1840
return message.getBody();
@@ -21,6 +43,8 @@ protected String getMessageContent(SQSMessage message) {
2143
@Override
2244
protected void updateMessageContent(SQSMessage message, String messageContent) {
2345
message.setBody(messageContent);
46+
// we update the MD5 digest so it doesn't look tempered
47+
message.setMd5OfBody(calculateMessageBodyMd5(messageContent).orElse(message.getMd5OfBody()));
2448
}
2549

2650
@Override
@@ -36,5 +60,103 @@ protected void removeLargeMessageAttributes(SQSMessage message) {
3660
newAttributes.remove(RESERVED_ATTRIBUTE_NAME);
3761
newAttributes.remove(LEGACY_RESERVED_ATTRIBUTE_NAME);
3862
message.setMessageAttributes(newAttributes);
63+
// we update the MD5 digest so it doesn't look tempered
64+
message.setMd5OfMessageAttributes(calculateMessageAttributesMd5(newAttributes).orElse(message.getMd5OfMessageAttributes()));
65+
}
66+
67+
/**
68+
* Compute the MD5 of the message body.<br/>
69+
* Inspired from {@code software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor}.<br/>
70+
* package protected for testing purpose.
71+
*
72+
* @param messageBody body of the SQS Message
73+
* @return the MD5 digest of the SQS Message body (or empty in case of error)
74+
*/
75+
static Optional<String> calculateMessageBodyMd5(String messageBody) {
76+
byte[] expectedMd5;
77+
try {
78+
expectedMd5 = Md5Utils.computeMD5Hash(messageBody.getBytes(StandardCharsets.UTF_8));
79+
} catch (Exception e) {
80+
LOG.warn("Unable to calculate the MD5 hash of the message body. ", e);
81+
return Optional.empty();
82+
}
83+
return Optional.of(BinaryUtils.toHex(expectedMd5));
84+
}
85+
86+
/**
87+
* Compute the MD5 of the message attributes.<br/>
88+
* Inspired from {@code software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor}.<br/>
89+
* package protected for testing purpose.
90+
*
91+
* @param messageAttributes attributes of the SQS Message
92+
* @return the MD5 digest of the SQS Message attributes (or empty in case of error)
93+
*/
94+
static Optional<String> calculateMessageAttributesMd5(final Map<String, MessageAttribute> messageAttributes) {
95+
List<String> sortedAttributeNames = new ArrayList<>(messageAttributes.keySet());
96+
Collections.sort(sortedAttributeNames);
97+
98+
MessageDigest md5Digest;
99+
try {
100+
md5Digest = MessageDigest.getInstance("MD5");
101+
102+
for (String attrName : sortedAttributeNames) {
103+
MessageAttribute attrValue = messageAttributes.get(attrName);
104+
105+
// Encoded Name
106+
updateLengthAndBytes(md5Digest, attrName);
107+
108+
// Encoded Type
109+
updateLengthAndBytes(md5Digest, attrValue.getDataType());
110+
111+
// Encoded Value
112+
if (attrValue.getStringValue() != null) {
113+
md5Digest.update(STRING_TYPE_FIELD_INDEX);
114+
updateLengthAndBytes(md5Digest, attrValue.getStringValue());
115+
} else if (attrValue.getBinaryValue() != null) {
116+
md5Digest.update(BINARY_TYPE_FIELD_INDEX);
117+
updateLengthAndBytes(md5Digest, attrValue.getBinaryValue());
118+
} else if (attrValue.getStringListValues() != null &&
119+
attrValue.getStringListValues().size() > 0) {
120+
md5Digest.update(STRING_LIST_TYPE_FIELD_INDEX);
121+
for (String strListMember : attrValue.getStringListValues()) {
122+
updateLengthAndBytes(md5Digest, strListMember);
123+
}
124+
} else if (attrValue.getBinaryListValues() != null &&
125+
attrValue.getBinaryListValues().size() > 0) {
126+
md5Digest.update(BINARY_LIST_TYPE_FIELD_INDEX);
127+
for (ByteBuffer byteListMember : attrValue.getBinaryListValues()) {
128+
updateLengthAndBytes(md5Digest, byteListMember);
129+
}
130+
}
131+
}
132+
} catch (Exception e) {
133+
LOG.warn("Unable to calculate the MD5 hash of the message attributes. ", e);
134+
return Optional.empty();
135+
}
136+
137+
return Optional.of(BinaryUtils.toHex(md5Digest.digest()));
138+
}
139+
140+
/**
141+
* Update the digest using a sequence of bytes that consists of the length (in 4 bytes) of the
142+
* input String and the actual utf8-encoded byte values.
143+
*/
144+
private static void updateLengthAndBytes(MessageDigest digest, String str) {
145+
byte[] utf8Encoded = str.getBytes(StandardCharsets.UTF_8);
146+
ByteBuffer lengthBytes = ByteBuffer.allocate(INTEGER_SIZE_IN_BYTES).putInt(utf8Encoded.length);
147+
digest.update(lengthBytes.array());
148+
digest.update(utf8Encoded);
149+
}
150+
151+
/**
152+
* Update the digest using a sequence of bytes that consists of the length (in 4 bytes) of the
153+
* input ByteBuffer and all the bytes it contains.
154+
*/
155+
private static void updateLengthAndBytes(MessageDigest digest, ByteBuffer binaryValue) {
156+
ByteBuffer readOnlyBuffer = binaryValue.asReadOnlyBuffer();
157+
int size = readOnlyBuffer.remaining();
158+
ByteBuffer lengthBytes = ByteBuffer.allocate(INTEGER_SIZE_IN_BYTES).putInt(size);
159+
digest.update(lengthBytes.array());
160+
digest.update(readOnlyBuffer);
39161
}
40162
}

0 commit comments

Comments
 (0)