diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml
new file mode 100644
index 000000000..d87d78765
--- /dev/null
+++ b/powertools-sqs/pom.xml
@@ -0,0 +1,104 @@
+
+
+ 4.0.0
+
+ powertools-sqs
+ jar
+
+
+ powertools-parent
+ software.amazon.lambda
+ 0.1.0-beta
+
+
+ AWS Lambda Powertools Java library SQS
+
+ A suite of utilities for AWS Lambda Functions that makes tracing with AWS X-Ray, structured logging and creating custom metrics asynchronously easier.
+
+ https://aws.amazon.com/lambda/
+
+ GitHub Issues
+ https://github.com/awslabs/aws-lambda-powertools-java/issues
+
+
+ https://github.com/awslabs/aws-lambda-powertools-java.git
+
+
+
+ AWS Lambda Powertools team
+ Amazon Web Services
+ https://aws.amazon.com/
+
+
+
+
+
+ ossrh
+ https://aws.oss.sonatype.org/content/repositories/snapshots
+
+
+
+
+
+ software.amazon.lambda
+ powertools-core
+
+
+ com.amazonaws
+ aws-lambda-java-core
+
+
+ com.amazonaws
+ aws-lambda-java-events
+
+
+ software.amazon.payloadoffloading
+ payloadoffloading-common
+
+
+
+ org.aspectj
+ aspectjrt
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+ org.apache.commons
+ commons-lang3
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.aspectj
+ aspectjweaver
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+
\ No newline at end of file
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java
new file mode 100644
index 000000000..9f67d2906
--- /dev/null
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java
@@ -0,0 +1,68 @@
+package software.amazon.lambda.powertools.sqs;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * {@code LargeMessageHandler} is used to signal that the annotated method
+ * should be extended to handle large SQS messages which have been offloaded
+ * to S3
+ *
+ * {@code LargeMessageHandler} automatically retrieves and deletes messages
+ * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib}
+ * client library.
+ *
+ * This version of the {@code LargeMessageHandler} is compatible with version
+ * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.
+ *
+ *
+ * <dependency>
+ * <groupId>com.amazonaws</groupId>
+ * <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
+ * <version>1.1.0</version>
+ * </dependency>
+ *
+ *
+ * {@code LargeMessageHandler} should be used with the handleRequest method of a class
+ * which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with
+ * {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.
+ *
+ *
+ * public class SqsMessageHandler implements RequestHandler {
+ *
+ * {@literal @}Override
+ * {@literal @}LargeMessageHandler
+ * public String handleRequest(SQSEvent sqsEvent, Context context) {
+ *
+ * // process messages
+ *
+ * return "ok";
+ * }
+ *
+ * ...
+ *
+ *
+ * Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();}
+ * each record received in the SQSEvent {@code LargeMessageHandler} will checked
+ * to see if it's body contains a payload which has been offloaded to S3. If it
+ * does then {@code getObject(bucket, key)} will be called and the payload
+ * retrieved.
+ *
+ * Note: Retreiving payloads from S3 will increase the duration of the
+ * Lambda function.
+ *
+ * If the request handler method returns then each payload will be deleted
+ * from S3 using {@code deleteObject(bucket, key)}
+ *
+ * To disable deletion of payloads setting the following annotation parameter
+ * {@code @LargeMessageHandler(deletePayloads=false)}
+ *
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface LargeMessageHandler {
+
+ boolean deletePayloads() default true;
+}
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
new file mode 100644
index 000000000..508a3ca77
--- /dev/null
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -0,0 +1,127 @@
+package software.amazon.lambda.powertools.sqs.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkClientException;
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.util.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
+import software.amazon.payloadoffloading.PayloadS3Pointer;
+
+import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
+import static java.lang.String.format;
+import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
+
+@Aspect
+public class SqsMessageAspect {
+
+ private static final Log LOG = LogFactory.getLog(SqsMessageAspect.class);
+ private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();
+
+ @SuppressWarnings({"EmptyMethod"})
+ @Pointcut("@annotation(largeMessageHandler)")
+ public void callAt(LargeMessageHandler largeMessageHandler) {
+ }
+
+ @Around(value = "callAt(largeMessageHandler) && execution(@LargeMessageHandler * *.*(..))", argNames = "pjp,largeMessageHandler")
+ public Object around(ProceedingJoinPoint pjp,
+ LargeMessageHandler largeMessageHandler) throws Throwable {
+ Object[] proceedArgs = pjp.getArgs();
+
+ if (isHandlerMethod(pjp)
+ && placedOnSqsEventRequestHandler(pjp)) {
+ List pointersToDelete = rewriteMessages((SQSEvent) proceedArgs[0]);
+
+ Object proceed = pjp.proceed(proceedArgs);
+
+ if (largeMessageHandler.deletePayloads()) {
+ pointersToDelete.forEach(this::deleteMessageFromS3);
+ }
+ return proceed;
+ }
+
+ return pjp.proceed(proceedArgs);
+ }
+
+ private List rewriteMessages(SQSEvent sqsEvent) {
+ List s3Pointers = new ArrayList<>();
+
+ for (SQSMessage sqsMessage : sqsEvent.getRecords()) {
+ if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
+ PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
+
+ S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> {
+ S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key());
+ LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
+ return object;
+ });
+
+ sqsMessage.setBody(readStringFromS3Object(s3Object));
+ s3Pointers.add(s3Pointer);
+ }
+ }
+
+ return s3Pointers;
+ }
+
+ private boolean isBodyLargeMessagePointer(String record) {
+ return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
+ }
+
+ private String readStringFromS3Object(S3Object object) {
+ try (S3ObjectInputStream is = object.getObjectContent()) {
+ return IOUtils.toString(is);
+ } catch (IOException e) {
+ LOG.error("Error converting S3 object to String", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e);
+ }
+ }
+
+ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
+ callS3Gracefully(s3Pointer, pointer -> {
+ amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
+ LOG.info("Message deleted from S3: " + s3Pointer.toJson());
+ return null;
+ });
+ }
+
+ private R callS3Gracefully(final PayloadS3Pointer pointer,
+ final Function function) {
+ try {
+ return function.apply(pointer);
+ } catch (AmazonServiceException e) {
+ LOG.error("A service exception", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
+ } catch (SdkClientException e) {
+ LOG.error("Some sort of client exception", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
+ }
+ }
+
+ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
+ return pjp.getArgs().length == 2
+ && pjp.getArgs()[0] instanceof SQSEvent
+ && pjp.getArgs()[1] instanceof Context;
+ }
+
+ static class FailedProcessingLargePayloadException extends RuntimeException {
+ public FailedProcessingLargePayloadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java
new file mode 100644
index 000000000..0bf8260a4
--- /dev/null
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java
@@ -0,0 +1,16 @@
+package software.amazon.lambda.powertools.sqs.handlers;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
+import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
+import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
+
+public class LambdaHandlerApiGateway implements RequestHandler {
+
+ @Override
+ @LargeMessageHandler
+ public String handleRequest(APIGatewayProxyRequestEvent sqsEvent, Context context) {
+ return sqsEvent.getBody();
+ }
+}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java
new file mode 100644
index 000000000..b3d7d4af7
--- /dev/null
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java
@@ -0,0 +1,15 @@
+package software.amazon.lambda.powertools.sqs.handlers;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
+
+public class SqsMessageHandler implements RequestHandler {
+
+ @Override
+ @LargeMessageHandler
+ public String handleRequest(SQSEvent sqsEvent, Context context) {
+ return sqsEvent.getRecords().get(0).getBody();
+ }
+}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java
new file mode 100644
index 000000000..a301a2a65
--- /dev/null
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java
@@ -0,0 +1,15 @@
+package software.amazon.lambda.powertools.sqs.handlers;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
+
+public class SqsNoDeleteMessageHandler implements RequestHandler {
+
+ @Override
+ @LargeMessageHandler(deletePayloads = false)
+ public String handleRequest(SQSEvent sqsEvent, Context context) {
+ return sqsEvent.getRecords().get(0).getBody();
+ }
+}
\ No newline at end of file
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
new file mode 100644
index 000000000..cafec7eef
--- /dev/null
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
@@ -0,0 +1,184 @@
+package software.amazon.lambda.powertools.sqs.internal;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkClientException;
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.util.StringInputStream;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.Mock;
+import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway;
+import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler;
+import software.amazon.lambda.powertools.sqs.handlers.SqsNoDeleteMessageHandler;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.FailedProcessingLargePayloadException;
+
+public class SqsMessageAspectTest {
+
+ private RequestHandler requestHandler;
+
+ @Mock
+ private Context context;
+
+ @Mock
+ private AmazonS3 amazonS3;
+
+ private static final String BUCKET_NAME = "bucketname";
+ private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf";
+
+ @BeforeEach
+ void setUp() throws IllegalAccessException {
+ initMocks(this);
+ setupContext();
+ writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true);
+ requestHandler = new SqsMessageHandler();
+ }
+
+ @Test
+ public void testLargeMessage() {
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3ObjectWithLargeMessage());
+ SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");
+
+ String response = requestHandler.handleRequest(sqsEvent, context);
+
+ assertThat(response)
+ .isEqualTo("A big message");
+
+ verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
+ }
+
+ @Test
+ public void shouldNotProcessSmallMessageBody() {
+ S3Object s3Response = new S3Object();
+ s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
+
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
+ SQSEvent sqsEvent = messageWithBody("This is small message");
+
+ String response = requestHandler.handleRequest(sqsEvent, context);
+
+ assertThat(response)
+ .isEqualTo("This is small message");
+
+ verifyNoInteractions(amazonS3);
+ }
+
+ @ParameterizedTest
+ @MethodSource("exception")
+ public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) {
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenThrow(exception);
+
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
+ SQSEvent sqsEvent = messageWithBody(messageBody);
+
+ assertThatExceptionOfType(FailedProcessingLargePayloadException.class)
+ .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context))
+ .withCause(exception);
+
+ verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
+ }
+
+ @Test
+ public void testLargeMessageWithDeletionOff() {
+ requestHandler = new SqsNoDeleteMessageHandler();
+
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3ObjectWithLargeMessage());
+ SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");
+
+ String response = requestHandler.handleRequest(sqsEvent, context);
+
+ assertThat(response).isEqualTo("A big message");
+
+ verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
+ }
+
+
+ @Test
+ public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException {
+ S3Object s3Response = new S3Object();
+ s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") {
+ @Override
+ public void close() throws IOException {
+ throw new IOException("Failed");
+ }
+ }, mock(HttpRequestBase.class)));
+
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
+
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
+ SQSEvent sqsEvent = messageWithBody(messageBody);
+
+ assertThatExceptionOfType(FailedProcessingLargePayloadException.class)
+ .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context))
+ .withCauseInstanceOf(IOException.class);
+
+ verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
+ }
+
+ @Test
+ public void shouldNotDoAnyProcessingWhenNotSqsEvent() {
+ LambdaHandlerApiGateway handler = new LambdaHandlerApiGateway();
+
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
+
+ APIGatewayProxyRequestEvent event = new APIGatewayProxyRequestEvent();
+ event.setBody(messageBody);
+ String response = handler.handleRequest(event, context);
+
+ assertThat(response)
+ .isEqualTo(messageBody);
+
+ verifyNoInteractions(amazonS3);
+ }
+
+ private S3Object s3ObjectWithLargeMessage() {
+ S3Object s3Response = new S3Object();
+ s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
+ return s3Response;
+ }
+
+ private static Stream exception() {
+ return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")),
+ Arguments.of(new SdkClientException("Client Exception")));
+ }
+
+ private SQSEvent messageWithBody(String messageBody) {
+ SQSMessage sqsMessage = new SQSMessage();
+ sqsMessage.setBody(messageBody);
+ SQSEvent sqsEvent = new SQSEvent();
+ sqsEvent.setRecords(singletonList(sqsMessage));
+ return sqsEvent;
+ }
+
+ private void setupContext() {
+ when(context.getFunctionName()).thenReturn("testFunction");
+ when(context.getInvokedFunctionArn()).thenReturn("testArn");
+ when(context.getFunctionVersion()).thenReturn("1");
+ when(context.getMemoryLimitInMB()).thenReturn(10);
+ }
+}
\ No newline at end of file