From 45c2c02f1956f9349382dc49c789d2297856836c Mon Sep 17 00:00:00 2001
From: Jerome Van Der Linden Use this annotation to handle large messages (> 256 KB) from SQS or SNS.
+ * When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record. {@code @LargeMessage} automatically retrieves and deletes messages
+ * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib}
+ * client libraries. This version of the {@code @LargeMessage} is compatible with version
+ * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}. Put this annotation on a method where the first parameter is either a {@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} or {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord}.
+ *
+ * Optional: Use it in your Lambda constructor to pass a custom {@link S3Client} to the {@link software.amazon.lambda.powertools.largemessages.internal.LargeMessageProcessor}
+ *
+ * If you don't use this, a default S3Client will be created.
+ *
+ * public MyLambdaHandler() {
+ * LargeMessageConfig.builder().withS3Client(S3Client.create()).build();
+ * }
+ *
+ */
+public class LargeMessageConfig {
+
+ private static final LargeMessageConfig INSTANCE = new LargeMessageConfig();
+ private S3Client s3Client;
+
+ private LargeMessageConfig() {}
+
+ public static LargeMessageConfig get() {
+ return INSTANCE;
+ }
+
+ public static LargeMessageConfig init() {
+ return INSTANCE;
+ }
+
+ public void withS3Client(S3Client s3Client) {
+ if (this.s3Client == null) {
+ this.s3Client = s3Client;
+ }
+ }
+
+ // For tests purpose
+ void setS3Client(S3Client s3Client) {
+ this.s3Client = s3Client;
+ }
+
+ // Getter needs to initialize if not done with setter
+ public S3Client getS3Client() {
+ if (this.s3Client == null) {
+ S3ClientBuilder s3ClientBuilder = S3Client.builder()
+ .httpClient(UrlConnectionHttpClient.builder().build())
+ .region(Region.of(System.getenv(AWS_REGION_ENV)));
+
+ // AWS_LAMBDA_INITIALIZATION_TYPE has two values on-demand and snap-start
+ // when using snap-start mode, the env var creds provider isn't used and causes a fatal error if set
+ // fall back to the default provider chain if the mode is anything other than on-demand.
+ String initializationType = System.getenv().get(AWS_LAMBDA_INITIALIZATION_TYPE);
+ if (initializationType != null && initializationType.equals(ON_DEMAND)) {
+ s3ClientBuilder.credentialsProvider(EnvironmentVariableCredentialsProvider.create());
+ }
+ this.s3Client = s3ClientBuilder.build();
+ }
+ return this.s3Client;
+ }
+}
diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageProcessingException.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageProcessingException.java
new file mode 100644
index 000000000..e8143617d
--- /dev/null
+++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageProcessingException.java
@@ -0,0 +1,11 @@
+package software.amazon.lambda.powertools.largemessages;
+
+public class LargeMessageProcessingException extends RuntimeException {
+ public LargeMessageProcessingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public LargeMessageProcessingException(String message) {
+ super(message);
+ }
+}
diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java
new file mode 100644
index 000000000..a53d21fee
--- /dev/null
+++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java
@@ -0,0 +1,48 @@
+package software.amazon.lambda.powertools.largemessages.internal;
+
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.lambda.powertools.largemessages.LargeMessage;
+
+import java.util.Optional;
+
+import static java.lang.String.format;
+
+
+@Aspect
+public class LargeMessageAspect {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LargeMessageAspect.class);
+
+ @SuppressWarnings({"EmptyMethod"})
+ @Pointcut("@annotation(largeMessage)")
+ public void callAt(LargeMessage largeMessage) {
+ }
+
+ @Around(value = "callAt(largeMessage) && execution(@LargeMessage * *.*(..))", argNames = "pjp,largeMessage")
+ public Object around(ProceedingJoinPoint pjp,
+ LargeMessage largeMessage) throws Throwable {
+ Object[] proceedArgs = pjp.getArgs();
+
+ // we need a message to process
+ if (proceedArgs.length == 0) {
+ LOG.warn("@LargeMessage annotation is placed on a method without any message to process, proceeding");
+ return pjp.proceed(proceedArgs);
+ }
+
+ Object message = proceedArgs[0];
+ Optional
+ *
+ * SQS:
+ *
+ * @LargeMessage
+ * private void processRawMessage(SQSMessage sqsMessage, Context context) {
+ * // sqsMessage.getBody() will contain the content of the S3 Object
+ * }
+ *
+ * SNS:
+ *
+ * @LargeMessage
+ * private void processMessage(SNSRecord snsRecord) {
+ * // snsRecord.getSNS().getMessage() will contain the content of the S3 Object
+ * }
+ *
+ *
To disable the deletion of S3 objects, you can configure the {@code deleteS3Object} option to false (default is true): + *
+ * @LargeMessage(deleteS3Object = false) + *+ * + * + *
Note 1: Retrieving payloads and deleting objects from S3 will increase the duration of the + * Lambda function.
+ *Note 2: Make sure to configure your function with enough memory to be able to retrieve S3 objects.
+ */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface LargeMessage { - boolean deleteS3Objects() default true; + /** + * Specify if S3 objects must be deleted after being processed (default = true) + */ + boolean deleteS3Object() default true; } diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java index 0d167e2c1..51fb8002a 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java @@ -18,7 +18,7 @@ * If you don't use this, a default S3Client will be created. ** public MyLambdaHandler() { - * LargeMessageConfig.builder().withS3Client(S3Client.create()).build(); + * LargeMessageConfig.init().withS3Client(S3Client.create()); * } **/ @@ -44,8 +44,8 @@ public void withS3Client(S3Client s3Client) { } // For tests purpose - void setS3Client(S3Client s3Client) { - this.s3Client = s3Client; + void resetS3Client() { + this.s3Client = null; } // Getter needs to initialize if not done with setter diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java index a53d21fee..51e80f030 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java @@ -12,7 +12,9 @@ import static java.lang.String.format; - +/** + * Handle {@link LargeMessage} annotations. + */ @Aspect public class LargeMessageAspect { @@ -42,7 +44,7 @@ public Object around(ProceedingJoinPoint pjp, return pjp.proceed(proceedArgs); } - return largeMessageProcessor.get().process(pjp, largeMessage.deleteS3Objects()); + return largeMessageProcessor.get().process(pjp, largeMessage.deleteS3Object()); } } diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java index b6642186d..1049e7860 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfigTest.java @@ -12,12 +12,12 @@ public class LargeMessageConfigTest { @BeforeEach public void setup() { - LargeMessageConfig.get().setS3Client(null); + LargeMessageConfig.get().resetS3Client(); } @AfterEach public void tearDown() { - LargeMessageConfig.get().setS3Client(null); + LargeMessageConfig.get().resetS3Client(); } @Test diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java index 63132a1df..9e88fcab3 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java @@ -71,7 +71,7 @@ private String processSNSMessageWithoutContext(SNSRecord snsRecord) { return snsRecord.getSNS().getMessage(); } - @LargeMessage(deleteS3Objects = false) + @LargeMessage(deleteS3Object = false) private String processSQSMessageNoDelete(SQSMessage sqsMessage, Context context) { return sqsMessage.getBody(); } From f4308cacc982bc5f850cfe3234a8e4f7b9549df0 Mon Sep 17 00:00:00 2001 From: Jerome Van Der Linden
{@code SqsLargeMessage} is used to signal that the annotated method * should be extended to handle large SQS messages which have been offloaded - * to S3 + * to S3
* *{@code SqsLargeMessage} automatically retrieves and deletes messages * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} @@ -73,6 +76,7 @@ *
To disable deletion of payloads setting the following annotation parameter * {@code @SqsLargeMessage(deletePayloads=false)}
*/ +@Deprecated @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface SqsLargeMessage { diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 8c06a6291..ace2a42d6 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -36,7 +36,11 @@ /** * A class of helper functions to add additional functionality to {@link SQSEvent} processing. + * + * @deprecated Batch processing is now handled in powertools-batch and large messages in powertools-large-messages. + * This class will no longer be available in version 2. */ +@Deprecated public final class SqsUtils { private static final Logger LOG = LoggerFactory.getLogger(SqsUtils.class); From 9b39cbbbc0c008207f984733bcfe896533dd23d8 Mon Sep 17 00:00:00 2001 From: Jerome Van Der LindenThis version of the {@code @LargeMessage} is compatible with version - * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.
+ * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}. *Put this annotation on a method where the first parameter is either a {@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} or {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord}.
*
diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java
index 51fb8002a..a1c66ed69 100644
--- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java
+++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessageConfig.java
@@ -27,7 +27,8 @@ public class LargeMessageConfig {
private static final LargeMessageConfig INSTANCE = new LargeMessageConfig();
private S3Client s3Client;
- private LargeMessageConfig() {}
+ private LargeMessageConfig() {
+ }
public static LargeMessageConfig get() {
return INSTANCE;
diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java
index 36a433844..979ab50e3 100644
--- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java
+++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java
@@ -3,20 +3,12 @@
import org.aspectj.lang.ProceedingJoinPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
-import software.amazon.awssdk.utils.IoUtils;
import software.amazon.lambda.powertools.largemessages.LargeMessageConfig;
import software.amazon.lambda.powertools.largemessages.LargeMessageProcessingException;
-import software.amazon.payloadoffloading.PayloadS3Pointer;
-
-import java.io.IOException;
-import java.util.Optional;
-import java.util.function.Function;
+import software.amazon.payloadoffloading.S3BackedPayloadStore;
+import software.amazon.payloadoffloading.S3Dao;
import static java.lang.String.format;
@@ -25,35 +17,33 @@
* of the S3 Object leveraging the payloadoffloading library.
*
* @param
+ * Inspired from {@code software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor}.
+ * package protected for testing purpose.
+ *
+ * @param messageBody body of the SQS Message
+ * @return the MD5 digest of the SQS Message body (or empty in case of error)
+ */
+ static Optional
+ * Inspired from {@code software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor}.
+ * package protected for testing purpose.
+ *
+ * @param messageAttributes attributes of the SQS Message
+ * @return the MD5 digest of the SQS Message attributes (or empty in case of error)
+ */
+ static Optional