From 6892f45ec4b688ce39869306c15ce370c6da85f7 Mon Sep 17 00:00:00 2001
From: msailes
Date: Fri, 28 Aug 2020 12:03:17 +0100
Subject: [PATCH 01/21] first working version
---
pom.xml | 14 +++
powertools-sqs/pom.xml | 107 +++++++++++++++++
.../powertools/sqs/LargeMessageHandler.aj | 11 ++
.../sqs/internal/SqsMessageAspect.java | 109 ++++++++++++++++++
.../sqs/handlers/SqsMessageHandler.java | 15 +++
.../sqs/internal/SqsMessageAspectTest.java | 47 ++++++++
.../src/test/resources/large-message.json | 0
.../src/test/resources/small-message.json | 1 +
8 files changed, 304 insertions(+)
create mode 100644 powertools-sqs/pom.xml
create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java
create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
create mode 100644 powertools-sqs/src/test/resources/large-message.json
create mode 100644 powertools-sqs/src/test/resources/small-message.json
diff --git a/pom.xml b/pom.xml
index 1ae0030c8..b58289c69 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
powertools-core
powertools-logging
powertools-tracing
+ powertools-sqs
@@ -50,9 +51,12 @@
2.13.3
2.11.0
1.9.6
+ 2.14.4
2.4.0
+ 1.0.0
UTF-8
1.2.1
+ 3.1.0
3.8.1
1.12.1
2.22.2
@@ -83,6 +87,16 @@
aws-lambda-java-core
${lambda.core.version}
+
+ com.amazonaws
+ aws-lambda-java-events
+ ${lambda.events.version}
+
+
+ software.amazon.payloadoffloading
+ payloadoffloading-common
+ ${payloadoffloading-common.version}
+
org.aspectj
aspectjrt
diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml
new file mode 100644
index 000000000..e2c2d6270
--- /dev/null
+++ b/powertools-sqs/pom.xml
@@ -0,0 +1,107 @@
+
+
+ 4.0.0
+
+ powertools-sqs
+ jar
+
+
+ powertools-parent
+ software.amazon.lambda
+ 0.1.0-SNAPSHOT
+
+
+ AWS Lambda Powertools Java library SQS
+
+ A suite of utilities for AWS Lambda Functions that makes tracing with AWS X-Ray, structured logging and creating custom metrics asynchronously easier.
+
+ https://aws.amazon.com/lambda/
+
+ GitHub Issues
+ https://github.com/awslabs/aws-lambda-powertools-java/issues
+
+
+ https://github.com/awslabs/aws-lambda-powertools-java.git
+
+
+
+ AWS Lambda Powertools team
+ Amazon Web Services
+ https://aws.amazon.com/
+
+
+
+
+
+ ossrh
+ https://aws.oss.sonatype.org/content/repositories/snapshots
+
+
+
+
+
+ software.amazon.lambda
+ powertools-core
+
+
+ com.amazonaws
+ aws-lambda-java-core
+
+
+ com.amazonaws
+ aws-lambda-java-events
+
+
+ software.amazon.payloadoffloading
+ payloadoffloading-common
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.aspectj
+ aspectjrt
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.apache.commons
+ commons-lang3
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+ org.aspectj
+ aspectjweaver
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+
\ No newline at end of file
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
new file mode 100644
index 000000000..39e32c93c
--- /dev/null
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
@@ -0,0 +1,11 @@
+package software.amazon.lambda.powertools.sqs;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface LargeMessageHandler {
+}
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
new file mode 100644
index 000000000..9ef701240
--- /dev/null
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -0,0 +1,109 @@
+package software.amazon.lambda.powertools.sqs.internal;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkClientException;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.util.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
+import software.amazon.payloadoffloading.PayloadS3Pointer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Aspect
+public class SqsMessageAspect {
+
+ private static final Log LOG = LogFactory.getLog(SqsMessageAspect.class);
+ private AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();
+
+ @SuppressWarnings({"EmptyMethod"})
+ @Pointcut("@annotation(largeMessageHandler)")
+ public void callAt(LargeMessageHandler largeMessageHandler) {
+ }
+
+ @Around(value = "callAt(largeMessageHandler) && execution(@LargeMessageHandler * *.*(..))", argNames = "pjp,largeMessageHandler")
+ public Object around(ProceedingJoinPoint pjp,
+ LargeMessageHandler largeMessageHandler) throws Throwable {
+ Object[] proceedArgs = pjp.getArgs();
+ Object[] rewrittenArgs = rewriteMessages(proceedArgs);
+
+ Object proceed = pjp.proceed(rewrittenArgs);
+
+ List payloadS3Pointers = collectPayloadPointers(proceedArgs);
+ payloadS3Pointers.forEach(this::deleteMessageFromS3);
+
+ return proceed;
+ }
+
+ private List collectPayloadPointers(Object[] args) {
+ SQSEvent sqsEvent = (SQSEvent) args[0];
+
+ return sqsEvent.getRecords().stream()
+ .filter(record -> isBodyLargeMessagePointer(record.getBody()))
+ .map(record -> PayloadS3Pointer.fromJson(record.getBody()))
+ .collect(Collectors.toList());
+ }
+
+ private Object[] rewriteMessages(Object[] args) {
+ SQSEvent sqsEvent = (SQSEvent) args[0];
+
+ for (SQSEvent.SQSMessage sqsMessage : sqsEvent.getRecords()) {
+ if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
+ PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
+
+ try {
+ S3Object object = amazonS3.getObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
+ sqsMessage.setBody(readStringFromS3Object(object));
+ LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
+ } catch (AmazonServiceException e) {
+ LOG.error("A service exception", e);
+ } catch (SdkClientException e) {
+ LOG.error("Some sort of client exception", e);
+ }
+ }
+ }
+
+ args[0] = sqsEvent;
+
+ return args;
+ }
+
+ private boolean isBodyLargeMessagePointer(String record) {
+ return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
+ }
+
+ private String readStringFromS3Object(S3Object object) {
+ S3ObjectInputStream is = object.getObjectContent();
+ String s3Body = null;
+ try {
+ s3Body = IOUtils.toString(is);
+ } catch (IOException e) {
+ LOG.error("Error converting S3 object to String", e);
+ } finally {
+ IOUtils.closeQuietly(is, LOG);
+ }
+ return s3Body;
+ }
+
+ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
+ try {
+ amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
+ LOG.info("Message deleted from S3: " + s3Pointer.toJson());
+ } catch (AmazonServiceException e) {
+ LOG.error("A service exception", e);
+ } catch (SdkClientException e) {
+ LOG.error("Some sort of client exception", e);
+ }
+ }
+}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java
new file mode 100644
index 000000000..b3d7d4af7
--- /dev/null
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java
@@ -0,0 +1,15 @@
+package software.amazon.lambda.powertools.sqs.handlers;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
+
+public class SqsMessageHandler implements RequestHandler {
+
+ @Override
+ @LargeMessageHandler
+ public String handleRequest(SQSEvent sqsEvent, Context context) {
+ return sqsEvent.getRecords().get(0).getBody();
+ }
+}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
new file mode 100644
index 000000000..d4d0c0ed2
--- /dev/null
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
@@ -0,0 +1,47 @@
+package software.amazon.lambda.powertools.sqs.internal;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler;
+
+import java.util.Arrays;
+
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+public class SqsMessageAspectTest {
+
+ private RequestHandler requestHandler;
+
+ @Mock
+ private Context context;
+
+ @BeforeEach
+ void setUp() {
+ initMocks(this);
+ setupContext();
+ requestHandler = new SqsMessageHandler();
+ }
+
+ @Test
+ public void testLargeMessage() {
+ SQSEvent.SQSMessage sqsMessage = new SQSEvent.SQSMessage();
+ sqsMessage.setBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"ms-extended-sqs-client\",\"s3Key\":\"c71eb2ae-37e0-4265-8909-32f4153faddf\"}]");
+ SQSEvent sqsEvent = new SQSEvent();
+ sqsEvent.setRecords(Arrays.asList(sqsMessage));
+ String response = requestHandler.handleRequest(sqsEvent, context);
+
+// assertThat(response).hasToString("newValueFromS3");
+ }
+
+ private void setupContext() {
+ when(context.getFunctionName()).thenReturn("testFunction");
+ when(context.getInvokedFunctionArn()).thenReturn("testArn");
+ when(context.getFunctionVersion()).thenReturn("1");
+ when(context.getMemoryLimitInMB()).thenReturn(10);
+ }
+}
\ No newline at end of file
diff --git a/powertools-sqs/src/test/resources/large-message.json b/powertools-sqs/src/test/resources/large-message.json
new file mode 100644
index 000000000..e69de29bb
diff --git a/powertools-sqs/src/test/resources/small-message.json b/powertools-sqs/src/test/resources/small-message.json
new file mode 100644
index 000000000..bc0356cef
--- /dev/null
+++ b/powertools-sqs/src/test/resources/small-message.json
@@ -0,0 +1 @@
+{Records: [{messageId: bf73ff26-91ef-4da7-95d2-a78459803354,receiptHandle: AQEBTHZU0y5EmTwswCwnxSNYUOx0RC6S4XtPQndT6CI4kVzIcAi4pTcZyyPGCSLvxIlcn1kqIMdlqPx/Y8pW253CtPH3k5kQcoi9jrx5gbciiNJt9RNxOOxgSnWXGmLOumVsxq3mIk4udDeqKQmEzfJ4NmAno1eTIGxYa2OT9I0gCEf63fQPJyAFGfdbTPRW8MBzehkuHEwT5IWnSIPCi6IiiJSP2lpMDTtvVTMkUUGEeWr6xae7kQdklXi4Qfc4HuqCVDGobyG0/a/GR4rEoZIsf1GVvhWt7Kt5+XP1fIt08q6VlINO1gWfzoZrEGE1SfC+76gyjFMiWz0kzPDxSWymY80HFEE2cVaFQAjLeTBWlzYjKrELdRSX+VSJEuonKyi6jWz/7Jq4SvDca4S6N5Fo7Pedb5/ycDSzMSzCx31Juwru96yoTfYAZc+tOe/T0+Pr,eventSourceARN: arn:aws:sqs:eu-west-1:719169216310:large-payload-handler-LargeMessagesQueue-1NHWQ9486PN1T,eventSource: aws:sqs,awsRegion: eu-west-1,body: a small message,md5OfBody: aa608debbb18ac56aa5309265665c5f2,attributes: {ApproximateReceiveCount=1, SentTimestamp=1598611408346, SenderId=AIDA2O4OTL43OERPSY2OR, ApproximateFirstReceiveTimestamp=1598611408427},messageAttributes: {}}]}
\ No newline at end of file
From d714d734ac0acb7b4831a5cd6249e128ff7a5c03 Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Fri, 28 Aug 2020 15:55:19 +0200
Subject: [PATCH 02/21] Minor refactoring for java8 specific constructs
---
.../sqs/internal/SqsMessageAspect.java | 82 ++++++++++++-------
1 file changed, 51 insertions(+), 31 deletions(-)
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
index 9ef701240..0eb4869ef 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -1,7 +1,12 @@
package software.amazon.lambda.powertools.sqs.internal;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.function.Function;
+
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
+import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
@@ -17,15 +22,16 @@
import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
import software.amazon.payloadoffloading.PayloadS3Pointer;
-import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+import static java.util.Optional.ofNullable;
+import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
@Aspect
public class SqsMessageAspect {
private static final Log LOG = LogFactory.getLog(SqsMessageAspect.class);
- private AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();
+ private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();
@SuppressWarnings({"EmptyMethod"})
@Pointcut("@annotation(largeMessageHandler)")
@@ -36,23 +42,25 @@ public void callAt(LargeMessageHandler largeMessageHandler) {
public Object around(ProceedingJoinPoint pjp,
LargeMessageHandler largeMessageHandler) throws Throwable {
Object[] proceedArgs = pjp.getArgs();
- Object[] rewrittenArgs = rewriteMessages(proceedArgs);
-
- Object proceed = pjp.proceed(rewrittenArgs);
- List payloadS3Pointers = collectPayloadPointers(proceedArgs);
- payloadS3Pointers.forEach(this::deleteMessageFromS3);
+ if (isHandlerMethod(pjp)
+ && placedOnSqsEventRequestHandler(pjp)) {
+ Object[] rewrittenArgs = rewriteMessages(proceedArgs);
+ Object proceed = pjp.proceed(rewrittenArgs);
+ deletePayloadPointers(proceedArgs);
+ return proceed;
+ }
- return proceed;
+ return pjp.proceed(proceedArgs);
}
- private List collectPayloadPointers(Object[] args) {
+ private void deletePayloadPointers(Object[] args) {
SQSEvent sqsEvent = (SQSEvent) args[0];
- return sqsEvent.getRecords().stream()
+ sqsEvent.getRecords().stream()
.filter(record -> isBodyLargeMessagePointer(record.getBody()))
.map(record -> PayloadS3Pointer.fromJson(record.getBody()))
- .collect(Collectors.toList());
+ .forEach(this::deleteMessageFromS3);
}
private Object[] rewriteMessages(Object[] args) {
@@ -62,15 +70,14 @@ private Object[] rewriteMessages(Object[] args) {
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
- try {
- S3Object object = amazonS3.getObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
- sqsMessage.setBody(readStringFromS3Object(object));
+ Optional s3Object = callS3Gracefully(s3Pointer, pointer -> {
+ S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key());
LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
- } catch (AmazonServiceException e) {
- LOG.error("A service exception", e);
- } catch (SdkClientException e) {
- LOG.error("Some sort of client exception", e);
- }
+ return object;
+ });
+
+ s3Object.flatMap(this::readStringFromS3Object)
+ .ifPresent(sqsMessage::setBody);
}
}
@@ -83,27 +90,40 @@ private boolean isBodyLargeMessagePointer(String record) {
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
}
- private String readStringFromS3Object(S3Object object) {
- S3ObjectInputStream is = object.getObjectContent();
- String s3Body = null;
- try {
- s3Body = IOUtils.toString(is);
+ private Optional readStringFromS3Object(S3Object object) {
+ try (S3ObjectInputStream is = object.getObjectContent()) {
+ return of(IOUtils.toString(is));
} catch (IOException e) {
LOG.error("Error converting S3 object to String", e);
- } finally {
- IOUtils.closeQuietly(is, LOG);
}
- return s3Body;
+
+ return empty();
}
private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
- try {
+ callS3Gracefully(s3Pointer, pointer -> {
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
- LOG.info("Message deleted from S3: " + s3Pointer.toJson());
+ LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
+ return null;
+ });
+ }
+
+ private Optional callS3Gracefully(final PayloadS3Pointer pointer,
+ final Function function) {
+ try {
+ return ofNullable(function.apply(pointer));
} catch (AmazonServiceException e) {
LOG.error("A service exception", e);
} catch (SdkClientException e) {
LOG.error("Some sort of client exception", e);
}
+
+ return empty();
+ }
+
+ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
+ return pjp.getArgs().length == 2
+ && pjp.getArgs()[0] instanceof SQSEvent
+ && pjp.getArgs()[1] instanceof Context;
}
}
From 1e71bdca2409d6e036f10ac2bbde60cfc918962b Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Fri, 28 Aug 2020 16:01:28 +0200
Subject: [PATCH 03/21] Fix logs for deletion
---
.../amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
index 0eb4869ef..83e30f18a 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -103,7 +103,7 @@ private Optional readStringFromS3Object(S3Object object) {
private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
callS3Gracefully(s3Pointer, pointer -> {
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
- LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
+ LOG.info("Message deleted from S3: " + s3Pointer.toJson());
return null;
});
}
From e3edebe05745cf3ed0fe7cb46b300cc8c58f3c42 Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Fri, 28 Aug 2020 16:15:15 +0200
Subject: [PATCH 04/21] Initial test cases to remove remote interaction
---
pom.xml | 6 ++
powertools-sqs/pom.xml | 5 ++
.../sqs/internal/SqsMessageAspect.java | 16 ++--
.../sqs/internal/SqsMessageAspectTest.java | 76 +++++++++++++++++--
4 files changed, 90 insertions(+), 13 deletions(-)
diff --git a/pom.xml b/pom.xml
index b58289c69..68ce03c16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,6 +156,12 @@
5.6.2
test
+
+ org.junit.jupiter
+ junit-jupiter-params
+ 5.6.2
+ test
+
org.apache.commons
commons-lang3
diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml
index e2c2d6270..23cb66835 100644
--- a/powertools-sqs/pom.xml
+++ b/powertools-sqs/pom.xml
@@ -82,6 +82,11 @@
junit-jupiter-engine
test
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
org.apache.commons
commons-lang3
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
index 83e30f18a..e9953371d 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -1,8 +1,10 @@
package software.amazon.lambda.powertools.sqs.internal;
import java.io.IOException;
+import java.util.List;
import java.util.Optional;
import java.util.function.Function;
+import java.util.stream.Collectors;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
@@ -22,6 +24,7 @@
import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
import software.amazon.payloadoffloading.PayloadS3Pointer;
+import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static java.util.Optional.ofNullable;
@@ -45,28 +48,31 @@ public Object around(ProceedingJoinPoint pjp,
if (isHandlerMethod(pjp)
&& placedOnSqsEventRequestHandler(pjp)) {
+ List payloadS3Pointers = collectPayloadPointers(proceedArgs);
+
Object[] rewrittenArgs = rewriteMessages(proceedArgs);
Object proceed = pjp.proceed(rewrittenArgs);
- deletePayloadPointers(proceedArgs);
+
+ payloadS3Pointers.forEach(this::deleteMessageFromS3);
return proceed;
}
return pjp.proceed(proceedArgs);
}
- private void deletePayloadPointers(Object[] args) {
+ private List collectPayloadPointers(Object[] args) {
SQSEvent sqsEvent = (SQSEvent) args[0];
- sqsEvent.getRecords().stream()
+ return sqsEvent.getRecords().stream()
.filter(record -> isBodyLargeMessagePointer(record.getBody()))
.map(record -> PayloadS3Pointer.fromJson(record.getBody()))
- .forEach(this::deleteMessageFromS3);
+ .collect(Collectors.toList());
}
private Object[] rewriteMessages(Object[] args) {
SQSEvent sqsEvent = (SQSEvent) args[0];
- for (SQSEvent.SQSMessage sqsMessage : sqsEvent.getRecords()) {
+ for (SQSMessage sqsMessage : sqsEvent.getRecords()) {
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
index d4d0c0ed2..8acaace45 100644
--- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
@@ -1,15 +1,29 @@
package software.amazon.lambda.powertools.sqs.internal;
+import java.io.ByteArrayInputStream;
+import java.util.stream.Stream;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.SdkClientException;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.S3Object;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler;
-import java.util.Arrays;
-
+import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
@@ -20,22 +34,68 @@ public class SqsMessageAspectTest {
@Mock
private Context context;
+ @Mock
+ private AmazonS3 amazonS3;
+
@BeforeEach
- void setUp() {
+ void setUp() throws IllegalAccessException {
initMocks(this);
setupContext();
+ writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true);
requestHandler = new SqsMessageHandler();
}
@Test
public void testLargeMessage() {
- SQSEvent.SQSMessage sqsMessage = new SQSEvent.SQSMessage();
- sqsMessage.setBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"ms-extended-sqs-client\",\"s3Key\":\"c71eb2ae-37e0-4265-8909-32f4153faddf\"}]");
- SQSEvent sqsEvent = new SQSEvent();
- sqsEvent.setRecords(Arrays.asList(sqsMessage));
+ String bucketName = "ms-extended-sqs-client";
+ String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
+ S3Object s3Response = new S3Object();
+ s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
+
+ when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response);
+ SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]");
+
String response = requestHandler.handleRequest(sqsEvent, context);
-// assertThat(response).hasToString("newValueFromS3");
+ assertThat(response)
+ .isEqualTo("A big message");
+
+ verify(amazonS3).deleteObject(bucketName, bucketKey);
+ }
+
+ @ParameterizedTest
+ @MethodSource("exception")
+ public void shouldKeepOriginalBodyIfFailedDownloadingFromS3(RuntimeException exception) {
+ String bucketName = "ms-extended-sqs-client";
+ String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
+ S3Object s3Response = new S3Object();
+ s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
+
+ when(amazonS3.getObject(bucketName, bucketKey)).thenThrow(exception);
+
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]";
+ SQSEvent sqsEvent = messageWithBody(messageBody);
+
+ String response = requestHandler.handleRequest(sqsEvent, context);
+
+ assertThat(response)
+ .isEqualTo(messageBody);
+
+ //TODO we need to fix the logic since object should not be deleted if we never managed to download it from S3 for any reason
+ //verify(amazonS3, never()).deleteObject(bucketName, bucketKey);
+ }
+
+ private static Stream exception() {
+ return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")),
+ Arguments.of(new SdkClientException("Client Exception")));
+ }
+
+ private SQSEvent messageWithBody(String messageBody) {
+ SQSMessage sqsMessage = new SQSMessage();
+ sqsMessage.setBody(messageBody);
+ SQSEvent sqsEvent = new SQSEvent();
+ sqsEvent.setRecords(singletonList(sqsMessage));
+ return sqsEvent;
}
private void setupContext() {
From 3ce91667f93fb09d96ebbbf946fe7067f0fa0d80 Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Fri, 28 Aug 2020 19:15:31 +0200
Subject: [PATCH 05/21] Fail hard when failed downloading any message from
batch in event
---
.../sqs/internal/SqsMessageAspect.java | 59 ++++++++-----------
.../sqs/internal/SqsMessageAspectTest.java | 16 +++--
2 files changed, 32 insertions(+), 43 deletions(-)
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
index e9953371d..7e16c4c8e 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -1,10 +1,9 @@
package software.amazon.lambda.powertools.sqs.internal;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.function.Function;
-import java.util.stream.Collectors;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
@@ -25,9 +24,7 @@
import software.amazon.payloadoffloading.PayloadS3Pointer;
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
-import static java.util.Optional.empty;
-import static java.util.Optional.of;
-import static java.util.Optional.ofNullable;
+import static java.lang.String.format;
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
@Aspect
@@ -48,62 +45,50 @@ public Object around(ProceedingJoinPoint pjp,
if (isHandlerMethod(pjp)
&& placedOnSqsEventRequestHandler(pjp)) {
- List payloadS3Pointers = collectPayloadPointers(proceedArgs);
+ List pointersToDelete = rewriteMessages(proceedArgs);
- Object[] rewrittenArgs = rewriteMessages(proceedArgs);
- Object proceed = pjp.proceed(rewrittenArgs);
+ Object proceed = pjp.proceed(proceedArgs);
- payloadS3Pointers.forEach(this::deleteMessageFromS3);
+ pointersToDelete.forEach(this::deleteMessageFromS3);
return proceed;
}
return pjp.proceed(proceedArgs);
}
- private List collectPayloadPointers(Object[] args) {
- SQSEvent sqsEvent = (SQSEvent) args[0];
-
- return sqsEvent.getRecords().stream()
- .filter(record -> isBodyLargeMessagePointer(record.getBody()))
- .map(record -> PayloadS3Pointer.fromJson(record.getBody()))
- .collect(Collectors.toList());
- }
-
- private Object[] rewriteMessages(Object[] args) {
+ private List rewriteMessages(Object[] args) {
SQSEvent sqsEvent = (SQSEvent) args[0];
+ List s3Pointers = new ArrayList<>();
for (SQSMessage sqsMessage : sqsEvent.getRecords()) {
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
- Optional s3Object = callS3Gracefully(s3Pointer, pointer -> {
+ S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> {
S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key());
LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
return object;
});
- s3Object.flatMap(this::readStringFromS3Object)
- .ifPresent(sqsMessage::setBody);
+ sqsMessage.setBody(readStringFromS3Object(s3Object));
+ s3Pointers.add(s3Pointer);
}
}
- args[0] = sqsEvent;
-
- return args;
+ return s3Pointers;
}
private boolean isBodyLargeMessagePointer(String record) {
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
}
- private Optional readStringFromS3Object(S3Object object) {
+ private String readStringFromS3Object(S3Object object) {
try (S3ObjectInputStream is = object.getObjectContent()) {
- return of(IOUtils.toString(is));
+ return IOUtils.toString(is);
} catch (IOException e) {
LOG.error("Error converting S3 object to String", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e);
}
-
- return empty();
}
private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
@@ -114,17 +99,17 @@ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
});
}
- private Optional callS3Gracefully(final PayloadS3Pointer pointer,
- final Function function) {
+ private R callS3Gracefully(final PayloadS3Pointer pointer,
+ final Function function) {
try {
- return ofNullable(function.apply(pointer));
+ return function.apply(pointer);
} catch (AmazonServiceException e) {
LOG.error("A service exception", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
} catch (SdkClientException e) {
LOG.error("Some sort of client exception", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
}
-
- return empty();
}
public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
@@ -132,4 +117,10 @@ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
&& pjp.getArgs()[0] instanceof SQSEvent
&& pjp.getArgs()[1] instanceof Context;
}
+
+ static class FailedProcessingLargePayloadException extends RuntimeException {
+ public FailedProcessingLargePayloadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
index 8acaace45..d509e4074 100644
--- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
@@ -22,10 +22,12 @@
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
+import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.FailedProcessingLargePayloadException;
public class SqsMessageAspectTest {
@@ -65,24 +67,20 @@ public void testLargeMessage() {
@ParameterizedTest
@MethodSource("exception")
- public void shouldKeepOriginalBodyIfFailedDownloadingFromS3(RuntimeException exception) {
+ public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) {
String bucketName = "ms-extended-sqs-client";
String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
- S3Object s3Response = new S3Object();
- s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
when(amazonS3.getObject(bucketName, bucketKey)).thenThrow(exception);
String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]";
SQSEvent sqsEvent = messageWithBody(messageBody);
- String response = requestHandler.handleRequest(sqsEvent, context);
-
- assertThat(response)
- .isEqualTo(messageBody);
+ assertThatExceptionOfType(FailedProcessingLargePayloadException.class)
+ .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context))
+ .withCause(exception);
- //TODO we need to fix the logic since object should not be deleted if we never managed to download it from S3 for any reason
- //verify(amazonS3, never()).deleteObject(bucketName, bucketKey);
+ verify(amazonS3, never()).deleteObject(bucketName, bucketKey);
}
private static Stream exception() {
From 8405b8131b8ede482dc7ee46640db561a266f753 Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Fri, 28 Aug 2020 19:15:31 +0200
Subject: [PATCH 06/21] Fail hard when failed downloading any message from
batch in event
---
.../sqs/internal/SqsMessageAspect.java | 59 ++++++--------
.../sqs/handlers/LambdaHandlerApiGateway.java | 16 ++++
.../sqs/internal/SqsMessageAspectTest.java | 79 +++++++++++++++++--
3 files changed, 114 insertions(+), 40 deletions(-)
create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
index e9953371d..7e16c4c8e 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -1,10 +1,9 @@
package software.amazon.lambda.powertools.sqs.internal;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
import java.util.function.Function;
-import java.util.stream.Collectors;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
@@ -25,9 +24,7 @@
import software.amazon.payloadoffloading.PayloadS3Pointer;
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
-import static java.util.Optional.empty;
-import static java.util.Optional.of;
-import static java.util.Optional.ofNullable;
+import static java.lang.String.format;
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
@Aspect
@@ -48,62 +45,50 @@ public Object around(ProceedingJoinPoint pjp,
if (isHandlerMethod(pjp)
&& placedOnSqsEventRequestHandler(pjp)) {
- List payloadS3Pointers = collectPayloadPointers(proceedArgs);
+ List pointersToDelete = rewriteMessages(proceedArgs);
- Object[] rewrittenArgs = rewriteMessages(proceedArgs);
- Object proceed = pjp.proceed(rewrittenArgs);
+ Object proceed = pjp.proceed(proceedArgs);
- payloadS3Pointers.forEach(this::deleteMessageFromS3);
+ pointersToDelete.forEach(this::deleteMessageFromS3);
return proceed;
}
return pjp.proceed(proceedArgs);
}
- private List collectPayloadPointers(Object[] args) {
- SQSEvent sqsEvent = (SQSEvent) args[0];
-
- return sqsEvent.getRecords().stream()
- .filter(record -> isBodyLargeMessagePointer(record.getBody()))
- .map(record -> PayloadS3Pointer.fromJson(record.getBody()))
- .collect(Collectors.toList());
- }
-
- private Object[] rewriteMessages(Object[] args) {
+ private List rewriteMessages(Object[] args) {
SQSEvent sqsEvent = (SQSEvent) args[0];
+ List s3Pointers = new ArrayList<>();
for (SQSMessage sqsMessage : sqsEvent.getRecords()) {
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
- Optional s3Object = callS3Gracefully(s3Pointer, pointer -> {
+ S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> {
S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key());
LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
return object;
});
- s3Object.flatMap(this::readStringFromS3Object)
- .ifPresent(sqsMessage::setBody);
+ sqsMessage.setBody(readStringFromS3Object(s3Object));
+ s3Pointers.add(s3Pointer);
}
}
- args[0] = sqsEvent;
-
- return args;
+ return s3Pointers;
}
private boolean isBodyLargeMessagePointer(String record) {
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
}
- private Optional readStringFromS3Object(S3Object object) {
+ private String readStringFromS3Object(S3Object object) {
try (S3ObjectInputStream is = object.getObjectContent()) {
- return of(IOUtils.toString(is));
+ return IOUtils.toString(is);
} catch (IOException e) {
LOG.error("Error converting S3 object to String", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e);
}
-
- return empty();
}
private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
@@ -114,17 +99,17 @@ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
});
}
- private Optional callS3Gracefully(final PayloadS3Pointer pointer,
- final Function function) {
+ private R callS3Gracefully(final PayloadS3Pointer pointer,
+ final Function function) {
try {
- return ofNullable(function.apply(pointer));
+ return function.apply(pointer);
} catch (AmazonServiceException e) {
LOG.error("A service exception", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
} catch (SdkClientException e) {
LOG.error("Some sort of client exception", e);
+ throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
}
-
- return empty();
}
public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
@@ -132,4 +117,10 @@ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
&& pjp.getArgs()[0] instanceof SQSEvent
&& pjp.getArgs()[1] instanceof Context;
}
+
+ static class FailedProcessingLargePayloadException extends RuntimeException {
+ public FailedProcessingLargePayloadException(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java
new file mode 100644
index 000000000..0bf8260a4
--- /dev/null
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java
@@ -0,0 +1,16 @@
+package software.amazon.lambda.powertools.sqs.handlers;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
+import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
+import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
+
+public class LambdaHandlerApiGateway implements RequestHandler {
+
+ @Override
+ @LargeMessageHandler
+ public String handleRequest(APIGatewayProxyRequestEvent sqsEvent, Context context) {
+ return sqsEvent.getBody();
+ }
+}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
index 8acaace45..63fc73695 100644
--- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
@@ -1,31 +1,41 @@
package software.amazon.lambda.powertools.sqs.internal;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.util.stream.Stream;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.util.StringInputStream;
+import org.apache.http.client.methods.HttpRequestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
+import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway;
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler;
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
+import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.FailedProcessingLargePayloadException;
public class SqsMessageAspectTest {
@@ -63,26 +73,83 @@ public void testLargeMessage() {
verify(amazonS3).deleteObject(bucketName, bucketKey);
}
- @ParameterizedTest
- @MethodSource("exception")
- public void shouldKeepOriginalBodyIfFailedDownloadingFromS3(RuntimeException exception) {
+ @Test
+ public void shouldNotProcessSmallMessageBody() {
String bucketName = "ms-extended-sqs-client";
String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
S3Object s3Response = new S3Object();
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
+ when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response);
+ SQSEvent sqsEvent = messageWithBody("This is small message");
+
+ String response = requestHandler.handleRequest(sqsEvent, context);
+
+ assertThat(response)
+ .isEqualTo("This is small message");
+
+ verifyNoInteractions(amazonS3);
+ }
+
+ @ParameterizedTest
+ @MethodSource("exception")
+ public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) {
+ String bucketName = "ms-extended-sqs-client";
+ String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
+
when(amazonS3.getObject(bucketName, bucketKey)).thenThrow(exception);
String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]";
SQSEvent sqsEvent = messageWithBody(messageBody);
- String response = requestHandler.handleRequest(sqsEvent, context);
+ assertThatExceptionOfType(FailedProcessingLargePayloadException.class)
+ .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context))
+ .withCause(exception);
+
+ verify(amazonS3, never()).deleteObject(bucketName, bucketKey);
+ }
+
+ @Test
+ public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException {
+ String bucketName = "ms-extended-sqs-client";
+ String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
+ S3Object s3Response = new S3Object();
+
+ s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") {
+ @Override
+ public void close() throws IOException {
+ throw new IOException("Failed");
+ }
+ }, mock(HttpRequestBase.class)));
+
+ when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response);
+
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]";
+ SQSEvent sqsEvent = messageWithBody(messageBody);
+
+ assertThatExceptionOfType(FailedProcessingLargePayloadException.class)
+ .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context))
+ .withCauseInstanceOf(IOException.class);
+
+ verify(amazonS3, never()).deleteObject(bucketName, bucketKey);
+ }
+
+ @Test
+ public void shouldNotDoAnyProcessingWhenNotSqsEvent() {
+ String bucketName = "ms-extended-sqs-client";
+ String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
+ LambdaHandlerApiGateway handler = new LambdaHandlerApiGateway();
+
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]";
+
+ APIGatewayProxyRequestEvent event = new APIGatewayProxyRequestEvent();
+ event.setBody(messageBody);
+ String response = handler.handleRequest(event, context);
assertThat(response)
.isEqualTo(messageBody);
- //TODO we need to fix the logic since object should not be deleted if we never managed to download it from S3 for any reason
- //verify(amazonS3, never()).deleteObject(bucketName, bucketKey);
+ verifyNoInteractions(amazonS3);
}
private static Stream exception() {
From d1b23696243f7a5eeb23464ad510e5b37ad5b14b Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Fri, 28 Aug 2020 21:50:12 +0200
Subject: [PATCH 07/21] Extract junit version as prop
---
pom.xml | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git a/pom.xml b/pom.xml
index 68ce03c16..1b4d2808f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,6 +66,7 @@
2.10.3
2.4
1.6
+ 5.6.2
@@ -147,19 +148,19 @@
org.junit.jupiter
junit-jupiter-api
- 5.6.2
+ ${junit-jupiter.version}
test
org.junit.jupiter
junit-jupiter-engine
- 5.6.2
+ ${junit-jupiter.version}
test
org.junit.jupiter
junit-jupiter-params
- 5.6.2
+ ${junit-jupiter.version}
test
From 4a049087ccb89bd888c73d29f208a50ed8d437c6 Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Fri, 28 Aug 2020 21:52:24 +0200
Subject: [PATCH 08/21] Remove unused log4j deps
---
powertools-sqs/pom.xml | 8 --------
1 file changed, 8 deletions(-)
diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml
index 23cb66835..daa2eb840 100644
--- a/powertools-sqs/pom.xml
+++ b/powertools-sqs/pom.xml
@@ -58,14 +58,6 @@
payloadoffloading-common
-
- org.apache.logging.log4j
- log4j-core
-
-
- org.apache.logging.log4j
- log4j-api
-
org.aspectj
aspectjrt
From dd176b3e12f389643bc07113ed6d8dfb64b47d5e Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Sat, 29 Aug 2020 08:10:29 +0200
Subject: [PATCH 09/21] Remove dependency of args on methods for reuse
---
.../lambda/powertools/sqs/internal/SqsMessageAspect.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
index 7e16c4c8e..ed8b6cd4d 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -45,7 +45,7 @@ public Object around(ProceedingJoinPoint pjp,
if (isHandlerMethod(pjp)
&& placedOnSqsEventRequestHandler(pjp)) {
- List pointersToDelete = rewriteMessages(proceedArgs);
+ List pointersToDelete = rewriteMessages((SQSEvent) proceedArgs[0]);
Object proceed = pjp.proceed(proceedArgs);
@@ -56,8 +56,7 @@ && placedOnSqsEventRequestHandler(pjp)) {
return pjp.proceed(proceedArgs);
}
- private List rewriteMessages(Object[] args) {
- SQSEvent sqsEvent = (SQSEvent) args[0];
+ private List rewriteMessages(SQSEvent sqsEvent) {
List s3Pointers = new ArrayList<>();
for (SQSMessage sqsMessage : sqsEvent.getRecords()) {
From 987bf86c6c101c03e58938a7466e80ef6ec97ff3 Mon Sep 17 00:00:00 2001
From: msailes
Date: Sat, 29 Aug 2020 12:33:25 +0100
Subject: [PATCH 10/21] added java docs and a new feature to disable deletion
of message payloads.
---
.../powertools/sqs/LargeMessageHandler.aj | 57 +++++++++++++++++++
.../sqs/internal/SqsMessageAspect.java | 14 +++--
.../handlers/SqsNoDeleteMessageHandler.java | 15 +++++
.../sqs/internal/SqsMessageAspectTest.java | 50 ++++++++++------
4 files changed, 113 insertions(+), 23 deletions(-)
create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
index 39e32c93c..694230df2 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
@@ -5,7 +5,64 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+/**
+ * {@code LargeMessageHandler} is used to signal that the annotated method
+ * should be extended to handle large SQS messages which have been offloaded
+ * to S3
+ *
+ * {@code LargeMessageHandler} automatically retrieves and deletes messages
+ * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib}
+ * client library.
+ *
+ * This version of the {@code LargeMessageHandler} is compatible with version
+ * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.
+ *
+ *
+ * <dependency>
+ * <groupId>com.amazonaws</groupId>
+ * <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
+ * <version>1.1.0</version>
+ * </dependency>
+ *
+ *
+ * {@code LargeMessageHandler} should be used with the handleRequest method of a class
+ * which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with
+ * {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.
+ *
+ *
+ * public class SqsMessageHandler implements RequestHandler {
+ *
+ * {@literal @}Override
+ * {@literal @}LargeMessageHandler
+ * public String handleRequest(SQSEvent sqsEvent, Context context) {
+ *
+ * // process messages
+ *
+ * return "ok";
+ * }
+ *
+ * ...
+ *
+ *
+ * Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();}
+ * each record received in the SQSEvent {@code LargeMessageHandler} will checked
+ * to see if it's body contains a payload which has been offloaded to S3. If it
+ * does then {@code getObject(bucket, key)} will be called and the payload
+ * retrieved.
+ *
+ * Note : Retreiving payloads from S3 will increase the duration of the
+ * Lambda function.
+ *
+ * If the request handler method returns then each payload will be deleted
+ * from S3 using {@code deleteObject(bucket, key)}
+ *
+ * To disable deletion of payloads setting the following annotation parameter
+ * {@code LargeMessageHandler(deletePayloads=false)}
+ *
+ */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LargeMessageHandler {
+
+ boolean deletePayloads() default true;
}
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
index 7e16c4c8e..43ccc874b 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java
@@ -1,10 +1,5 @@
package software.amazon.lambda.powertools.sqs.internal;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Function;
-
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.lambda.runtime.Context;
@@ -23,6 +18,11 @@
import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
import software.amazon.payloadoffloading.PayloadS3Pointer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.lang.String.format;
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
@@ -49,7 +49,9 @@ && placedOnSqsEventRequestHandler(pjp)) {
Object proceed = pjp.proceed(proceedArgs);
- pointersToDelete.forEach(this::deleteMessageFromS3);
+ if (largeMessageHandler.deletePayloads()) {
+ pointersToDelete.forEach(this::deleteMessageFromS3);
+ }
return proceed;
}
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java
new file mode 100644
index 000000000..a301a2a65
--- /dev/null
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java
@@ -0,0 +1,15 @@
+package software.amazon.lambda.powertools.sqs.handlers;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
+
+public class SqsNoDeleteMessageHandler implements RequestHandler {
+
+ @Override
+ @LargeMessageHandler(deletePayloads = false)
+ public String handleRequest(SQSEvent sqsEvent, Context context) {
+ return sqsEvent.getRecords().get(0).getBody();
+ }
+}
\ No newline at end of file
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
index d509e4074..a3aac55af 100644
--- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
@@ -1,8 +1,5 @@
package software.amazon.lambda.powertools.sqs.internal;
-import java.io.ByteArrayInputStream;
-import java.util.stream.Stream;
-
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.lambda.runtime.Context;
@@ -17,6 +14,10 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler;
+import software.amazon.lambda.powertools.sqs.handlers.SqsNoDeleteMessageHandler;
+
+import java.io.ByteArrayInputStream;
+import java.util.stream.Stream;
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.util.Collections.singletonList;
@@ -39,6 +40,9 @@ public class SqsMessageAspectTest {
@Mock
private AmazonS3 amazonS3;
+ private static final String BUCKET_NAME = "bucketname";
+ private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf";
+
@BeforeEach
void setUp() throws IllegalAccessException {
initMocks(this);
@@ -49,38 +53,50 @@ void setUp() throws IllegalAccessException {
@Test
public void testLargeMessage() {
- String bucketName = "ms-extended-sqs-client";
- String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
- S3Object s3Response = new S3Object();
- s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
-
- when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response);
- SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]");
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3ObjectWithLargeMessage());
+ SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");
String response = requestHandler.handleRequest(sqsEvent, context);
assertThat(response)
.isEqualTo("A big message");
- verify(amazonS3).deleteObject(bucketName, bucketKey);
+ verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
}
@ParameterizedTest
@MethodSource("exception")
public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) {
- String bucketName = "ms-extended-sqs-client";
- String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
-
- when(amazonS3.getObject(bucketName, bucketKey)).thenThrow(exception);
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenThrow(exception);
- String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]";
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
SQSEvent sqsEvent = messageWithBody(messageBody);
assertThatExceptionOfType(FailedProcessingLargePayloadException.class)
.isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context))
.withCause(exception);
- verify(amazonS3, never()).deleteObject(bucketName, bucketKey);
+ verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
+ }
+
+ @Test
+ public void testLargeMessageWithDeletionOff() {
+ requestHandler = new SqsNoDeleteMessageHandler();
+
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3ObjectWithLargeMessage());
+ SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");
+
+ String response = requestHandler.handleRequest(sqsEvent, context);
+
+ assertThat(response).isEqualTo("A big message");
+
+ verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
+ }
+
+ private S3Object s3ObjectWithLargeMessage() {
+ S3Object s3Response = new S3Object();
+ s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
+ return s3Response;
}
private static Stream exception() {
From 2ca2a4218369100831163ed6a502780c26f43a99 Mon Sep 17 00:00:00 2001
From: msailes
Date: Sat, 29 Aug 2020 14:29:18 +0100
Subject: [PATCH 11/21] refactored tests.
---
.../sqs/internal/SqsMessageAspectTest.java | 23 +++++--------------
1 file changed, 6 insertions(+), 17 deletions(-)
diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
index 2a216c546..cafec7eef 100644
--- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
+++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java
@@ -1,9 +1,5 @@
package software.amazon.lambda.powertools.sqs.internal;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.stream.Stream;
-
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.lambda.runtime.Context;
@@ -26,6 +22,7 @@
import software.amazon.lambda.powertools.sqs.handlers.SqsNoDeleteMessageHandler;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.util.stream.Stream;
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
@@ -34,7 +31,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.mock;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -78,12 +74,10 @@ public void testLargeMessage() {
@Test
public void shouldNotProcessSmallMessageBody() {
- String bucketName = "ms-extended-sqs-client";
- String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
S3Object s3Response = new S3Object();
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
- when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response);
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
SQSEvent sqsEvent = messageWithBody("This is small message");
String response = requestHandler.handleRequest(sqsEvent, context);
@@ -126,10 +120,7 @@ public void testLargeMessageWithDeletionOff() {
@Test
public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException {
- String bucketName = "ms-extended-sqs-client";
- String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
S3Object s3Response = new S3Object();
-
s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") {
@Override
public void close() throws IOException {
@@ -137,25 +128,23 @@ public void close() throws IOException {
}
}, mock(HttpRequestBase.class)));
- when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response);
+ when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
- String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]";
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
SQSEvent sqsEvent = messageWithBody(messageBody);
assertThatExceptionOfType(FailedProcessingLargePayloadException.class)
.isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context))
.withCauseInstanceOf(IOException.class);
- verify(amazonS3, never()).deleteObject(bucketName, bucketKey);
+ verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
}
@Test
public void shouldNotDoAnyProcessingWhenNotSqsEvent() {
- String bucketName = "ms-extended-sqs-client";
- String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf";
LambdaHandlerApiGateway handler = new LambdaHandlerApiGateway();
- String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]";
+ String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
APIGatewayProxyRequestEvent event = new APIGatewayProxyRequestEvent();
event.setBody(messageBody);
From f02356c43d51bac13cf70929e862e39530335835 Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Sat, 29 Aug 2020 19:18:05 +0200
Subject: [PATCH 12/21] Remove unused test resources
---
powertools-sqs/src/test/resources/large-message.json | 0
powertools-sqs/src/test/resources/small-message.json | 1 -
2 files changed, 1 deletion(-)
delete mode 100644 powertools-sqs/src/test/resources/large-message.json
delete mode 100644 powertools-sqs/src/test/resources/small-message.json
diff --git a/powertools-sqs/src/test/resources/large-message.json b/powertools-sqs/src/test/resources/large-message.json
deleted file mode 100644
index e69de29bb..000000000
diff --git a/powertools-sqs/src/test/resources/small-message.json b/powertools-sqs/src/test/resources/small-message.json
deleted file mode 100644
index bc0356cef..000000000
--- a/powertools-sqs/src/test/resources/small-message.json
+++ /dev/null
@@ -1 +0,0 @@
-{Records: [{messageId: bf73ff26-91ef-4da7-95d2-a78459803354,receiptHandle: AQEBTHZU0y5EmTwswCwnxSNYUOx0RC6S4XtPQndT6CI4kVzIcAi4pTcZyyPGCSLvxIlcn1kqIMdlqPx/Y8pW253CtPH3k5kQcoi9jrx5gbciiNJt9RNxOOxgSnWXGmLOumVsxq3mIk4udDeqKQmEzfJ4NmAno1eTIGxYa2OT9I0gCEf63fQPJyAFGfdbTPRW8MBzehkuHEwT5IWnSIPCi6IiiJSP2lpMDTtvVTMkUUGEeWr6xae7kQdklXi4Qfc4HuqCVDGobyG0/a/GR4rEoZIsf1GVvhWt7Kt5+XP1fIt08q6VlINO1gWfzoZrEGE1SfC+76gyjFMiWz0kzPDxSWymY80HFEE2cVaFQAjLeTBWlzYjKrELdRSX+VSJEuonKyi6jWz/7Jq4SvDca4S6N5Fo7Pedb5/ycDSzMSzCx31Juwru96yoTfYAZc+tOe/T0+Pr,eventSourceARN: arn:aws:sqs:eu-west-1:719169216310:large-payload-handler-LargeMessagesQueue-1NHWQ9486PN1T,eventSource: aws:sqs,awsRegion: eu-west-1,body: a small message,md5OfBody: aa608debbb18ac56aa5309265665c5f2,attributes: {ApproximateReceiveCount=1, SentTimestamp=1598611408346, SenderId=AIDA2O4OTL43OERPSY2OR, ApproximateFirstReceiveTimestamp=1598611408427},messageAttributes: {}}]}
\ No newline at end of file
From a5f56872438deddb2135159bbecc1ffc412e5955 Mon Sep 17 00:00:00 2001
From: msailes
Date: Sat, 29 Aug 2020 20:35:48 +0100
Subject: [PATCH 13/21] rename
---
.../powertools/sqs/LargeMessageHandler.aj | 68 -------------------
1 file changed, 68 deletions(-)
delete mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
deleted file mode 100644
index 694230df2..000000000
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj
+++ /dev/null
@@ -1,68 +0,0 @@
-package software.amazon.lambda.powertools.sqs;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * {@code LargeMessageHandler} is used to signal that the annotated method
- * should be extended to handle large SQS messages which have been offloaded
- * to S3
- *
- * {@code LargeMessageHandler} automatically retrieves and deletes messages
- * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib}
- * client library.
- *
- * This version of the {@code LargeMessageHandler} is compatible with version
- * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.
- *
- *
- * <dependency>
- * <groupId>com.amazonaws</groupId>
- * <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
- * <version>1.1.0</version>
- * </dependency>
- *
- *
- * {@code LargeMessageHandler} should be used with the handleRequest method of a class
- * which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with
- * {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.
- *
- *
- * public class SqsMessageHandler implements RequestHandler {
- *
- * {@literal @}Override
- * {@literal @}LargeMessageHandler
- * public String handleRequest(SQSEvent sqsEvent, Context context) {
- *
- * // process messages
- *
- * return "ok";
- * }
- *
- * ...
- *
- *
- * Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();}
- * each record received in the SQSEvent {@code LargeMessageHandler} will checked
- * to see if it's body contains a payload which has been offloaded to S3. If it
- * does then {@code getObject(bucket, key)} will be called and the payload
- * retrieved.
- *
- * Note : Retreiving payloads from S3 will increase the duration of the
- * Lambda function.
- *
- * If the request handler method returns then each payload will be deleted
- * from S3 using {@code deleteObject(bucket, key)}
- *
- * To disable deletion of payloads setting the following annotation parameter
- * {@code LargeMessageHandler(deletePayloads=false)}
- *
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface LargeMessageHandler {
-
- boolean deletePayloads() default true;
-}
From 4b699f07ba1478c5d7af4413dc76dd321e0ef23f Mon Sep 17 00:00:00 2001
From: msailes
Date: Sat, 29 Aug 2020 20:36:02 +0100
Subject: [PATCH 14/21] rename
---
.../powertools/sqs/LargeMessageHandler.java | 68 +++++++++++++++++++
1 file changed, 68 insertions(+)
create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java
new file mode 100644
index 000000000..9f67d2906
--- /dev/null
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java
@@ -0,0 +1,68 @@
+package software.amazon.lambda.powertools.sqs;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * {@code LargeMessageHandler} is used to signal that the annotated method
+ * should be extended to handle large SQS messages which have been offloaded
+ * to S3
+ *
+ * {@code LargeMessageHandler} automatically retrieves and deletes messages
+ * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib}
+ * client library.
+ *
+ * This version of the {@code LargeMessageHandler} is compatible with version
+ * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.
+ *
+ *
+ * <dependency>
+ * <groupId>com.amazonaws</groupId>
+ * <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
+ * <version>1.1.0</version>
+ * </dependency>
+ *
+ *
+ * {@code LargeMessageHandler} should be used with the handleRequest method of a class
+ * which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with
+ * {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.
+ *
+ *
+ * public class SqsMessageHandler implements RequestHandler {
+ *
+ * {@literal @}Override
+ * {@literal @}LargeMessageHandler
+ * public String handleRequest(SQSEvent sqsEvent, Context context) {
+ *
+ * // process messages
+ *
+ * return "ok";
+ * }
+ *
+ * ...
+ *
+ *
+ * Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();}
+ * each record received in the SQSEvent {@code LargeMessageHandler} will checked
+ * to see if it's body contains a payload which has been offloaded to S3. If it
+ * does then {@code getObject(bucket, key)} will be called and the payload
+ * retrieved.
+ *
+ * Note : Retreiving payloads from S3 will increase the duration of the
+ * Lambda function.
+ *
+ * If the request handler method returns then each payload will be deleted
+ * from S3 using {@code deleteObject(bucket, key)}
+ *
+ * To disable deletion of payloads setting the following annotation parameter
+ * {@code @LargeMessageHandler(deletePayloads=false)}
+ *
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface LargeMessageHandler {
+
+ boolean deletePayloads() default true;
+}
From d694c57f115cb950006f416a680b21949dd92ca0 Mon Sep 17 00:00:00 2001
From: msailes
Date: Sun, 30 Aug 2020 14:32:08 +0100
Subject: [PATCH 15/21] intial docs page
---
docs/content/dummy.md | 8 ---
.../utilities/large_message_handling.mdx | 67 +++++++++++++++++++
docs/gatsby-config.js | 5 +-
3 files changed, 71 insertions(+), 9 deletions(-)
delete mode 100644 docs/content/dummy.md
create mode 100644 docs/content/utilities/large_message_handling.mdx
diff --git a/docs/content/dummy.md b/docs/content/dummy.md
deleted file mode 100644
index d53b2cc3b..000000000
--- a/docs/content/dummy.md
+++ /dev/null
@@ -1,8 +0,0 @@
----
-title: Dummy title
-description: Dummy description
----
-
-## Test
-
-dummy content
\ No newline at end of file
diff --git a/docs/content/utilities/large_message_handling.mdx b/docs/content/utilities/large_message_handling.mdx
new file mode 100644
index 000000000..9ecc8b9a1
--- /dev/null
+++ b/docs/content/utilities/large_message_handling.mdx
@@ -0,0 +1,67 @@
+---
+title: Large Message Handling
+description: Utility
+---
+
+
+
+The large message handling utility is used to handle SQS messages which have had their payloads
+offloaded to S3 due to them being larger than the SQS maximum.
+
+The utility automatically retrieves and deletes messages which have been offloaded to S3 using the
+[amazon-sqs-java-extended-client-lib](https://github.com/awslabs/amazon-sqs-java-extended-client-lib)
+client library.
+
+This utility is compatible with versions 1.1.0+ of amazon-sqs-java-extended-client-lib.
+
+```xml
+
+ com.amazonaws
+ amazon-sqs-java-extended-client-lib
+ 1.1.0
+
+```
+
+## Install
+
+Add the following dependency to your project.
+
+```xml
+
+ software.amazon.lambda
+ powertools-sqs
+
+```
+
+The annotation `@LargeMessageHandler` should be used with the handleRequest method of a class
+which implements `com.amazonaws.services.lambda.runtime.RequestHandler` with
+`com.amazonaws.services.lambda.runtime.events.SQSEvent` as the first parameter.
+
+```java
+public class SqsMessageHandler implements RequestHandler {
+
+ @Override
+ @LargeMessageHandler
+ public String handleRequest(SQSEvent sqsEvent, Context context) {
+ // process messages
+
+ return "ok";
+}
+```
+
+`@LargeMessageHandler` creats a default S3 Client `AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient()`.
+When the Lambda function is invoked with an event from SQS, each record received
+in the SQSEvent will be checked to see if it's body contains a payload which has
+been offloaded to S3. If it does then `getObject(bucket, key)` will be called
+and the payload retrieved.
+
+*Note*: Retreiving payloads from S3 will increase the duration of the Lambda function.
+
+If the request handler method returns then each payload will be deleted from S3
+using `deleteObject(bucket, key)}`
+
+To disable deletion of payloads setting the following annotation parameter:
+
+```java
+@LargeMessageHandler(deletePayloads=false)
+```
\ No newline at end of file
diff --git a/docs/gatsby-config.js b/docs/gatsby-config.js
index c9aa9459a..0dadcc5b5 100644
--- a/docs/gatsby-config.js
+++ b/docs/gatsby-config.js
@@ -25,7 +25,10 @@ module.exports = {
'Core utilities': [
'core/logging',
'core/tracing'
- ]
+ ],
+ 'Utilities': [
+ 'utilities/large_message_handling'
+ ],
},
navConfig: {
'Serverless Best Practices video': {
From 3c2f694f34341722929032428015106ef024771a Mon Sep 17 00:00:00 2001
From: msailes
Date: Mon, 31 Aug 2020 08:37:14 +0100
Subject: [PATCH 16/21] updated docs page
---
.../utilities/large_message_handling.mdx | 67 -----------
.../utilities/sqs_large_message_handling.mdx | 106 ++++++++++++++++++
docs/gatsby-config.js | 2 +-
3 files changed, 107 insertions(+), 68 deletions(-)
delete mode 100644 docs/content/utilities/large_message_handling.mdx
create mode 100644 docs/content/utilities/sqs_large_message_handling.mdx
diff --git a/docs/content/utilities/large_message_handling.mdx b/docs/content/utilities/large_message_handling.mdx
deleted file mode 100644
index 9ecc8b9a1..000000000
--- a/docs/content/utilities/large_message_handling.mdx
+++ /dev/null
@@ -1,67 +0,0 @@
----
-title: Large Message Handling
-description: Utility
----
-
-
-
-The large message handling utility is used to handle SQS messages which have had their payloads
-offloaded to S3 due to them being larger than the SQS maximum.
-
-The utility automatically retrieves and deletes messages which have been offloaded to S3 using the
-[amazon-sqs-java-extended-client-lib](https://github.com/awslabs/amazon-sqs-java-extended-client-lib)
-client library.
-
-This utility is compatible with versions 1.1.0+ of amazon-sqs-java-extended-client-lib.
-
-```xml
-
- com.amazonaws
- amazon-sqs-java-extended-client-lib
- 1.1.0
-
-```
-
-## Install
-
-Add the following dependency to your project.
-
-```xml
-
- software.amazon.lambda
- powertools-sqs
-
-```
-
-The annotation `@LargeMessageHandler` should be used with the handleRequest method of a class
-which implements `com.amazonaws.services.lambda.runtime.RequestHandler` with
-`com.amazonaws.services.lambda.runtime.events.SQSEvent` as the first parameter.
-
-```java
-public class SqsMessageHandler implements RequestHandler {
-
- @Override
- @LargeMessageHandler
- public String handleRequest(SQSEvent sqsEvent, Context context) {
- // process messages
-
- return "ok";
-}
-```
-
-`@LargeMessageHandler` creats a default S3 Client `AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient()`.
-When the Lambda function is invoked with an event from SQS, each record received
-in the SQSEvent will be checked to see if it's body contains a payload which has
-been offloaded to S3. If it does then `getObject(bucket, key)` will be called
-and the payload retrieved.
-
-*Note*: Retreiving payloads from S3 will increase the duration of the Lambda function.
-
-If the request handler method returns then each payload will be deleted from S3
-using `deleteObject(bucket, key)}`
-
-To disable deletion of payloads setting the following annotation parameter:
-
-```java
-@LargeMessageHandler(deletePayloads=false)
-```
\ No newline at end of file
diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx
new file mode 100644
index 000000000..d2aac1308
--- /dev/null
+++ b/docs/content/utilities/sqs_large_message_handling.mdx
@@ -0,0 +1,106 @@
+---
+title: SQS Large Message Handling
+description: Utility
+---
+
+The large message handling utility handles SQS messages which have had their payloads
+offloaded to S3 due to them being larger than the SQS maximum.
+
+The utility automatically retrieves messages which have been offloaded to S3 using the
+[amazon-sqs-java-extended-client-lib](https://github.com/awslabs/amazon-sqs-java-extended-client-lib)
+client library. Once the message payloads have been processed successful the
+utility can delete the message payloads from S3.
+
+This utility is compatible with versions *1.1.0+* of amazon-sqs-java-extended-client-lib.
+
+```xml
+
+ com.amazonaws
+ amazon-sqs-java-extended-client-lib
+ 1.1.0
+
+```
+
+## Install
+
+To install this utility, add the following dependency to your project.
+
+```xml
+
+ software.amazon.lambda
+ powertools-sqs
+ x.x.x
+
+```
+
+And configure the aspectj-maven-plugin to compile-time weave (CTW) the
+aws-lambda-powertools-java aspects into your project. You may already have this
+plugin in your pom. In that case add the depenedency to the `aspectLibraries`
+section.
+
+```xml
+
+
+ ...
+
+ org.codehaus.mojo
+ aspectj-maven-plugin
+ 1.11
+
+ 1.8
+ 1.8
+ 1.8
+
+ ...
+
+ software.amazon.lambda
+ powertools-sqs
+
+ ...
+
+
+
+
+
+ compile
+
+
+
+
+ ...
+
+
+```
+
+The annotation `@LargeMessageHandler` should be used with the handleRequest method of a class
+which implements `com.amazonaws.services.lambda.runtime.RequestHandler` with
+`com.amazonaws.services.lambda.runtime.events.SQSEvent` as the first parameter.
+
+```java
+public class SqsMessageHandler implements RequestHandler {
+
+ @Override
+ @LargeMessageHandler
+ public String handleRequest(SQSEvent sqsEvent, Context context) {
+ // process messages
+
+ return "ok";
+}
+```
+
+`@LargeMessageHandler` creates a default S3 Client `AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient()`.
+When the Lambda function is invoked with an event from SQS, each record received
+in the SQSEvent will be checked to see if it's body contains a payload which has
+been offloaded to S3. If it does then `getObject(bucket, key)` will be called,
+and the payload retrieved.
+
+*Note*: Retrieving payloads from S3 will increase the duration of the Lambda function.
+
+If the request handler method returns without error then each payload will be
+deleted from S3 using `deleteObject(bucket, key)}`
+
+To disable deletion of payloads setting the following annotation parameter:
+
+```java
+@LargeMessageHandler(deletePayloads=false)
+```
\ No newline at end of file
diff --git a/docs/gatsby-config.js b/docs/gatsby-config.js
index 0dadcc5b5..e588bb5eb 100644
--- a/docs/gatsby-config.js
+++ b/docs/gatsby-config.js
@@ -27,7 +27,7 @@ module.exports = {
'core/tracing'
],
'Utilities': [
- 'utilities/large_message_handling'
+ 'utilities/sqs_large_message_handling'
],
},
navConfig: {
From 5d54bf7628ff5a3ce5948bcca053f22bf4157424 Mon Sep 17 00:00:00 2001
From: msailes
Date: Mon, 31 Aug 2020 08:40:48 +0100
Subject: [PATCH 17/21] typo
---
docs/content/utilities/sqs_large_message_handling.mdx | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx
index d2aac1308..606b078f6 100644
--- a/docs/content/utilities/sqs_large_message_handling.mdx
+++ b/docs/content/utilities/sqs_large_message_handling.mdx
@@ -97,7 +97,7 @@ and the payload retrieved.
*Note*: Retrieving payloads from S3 will increase the duration of the Lambda function.
If the request handler method returns without error then each payload will be
-deleted from S3 using `deleteObject(bucket, key)}`
+deleted from S3 using `deleteObject(bucket, key)`
To disable deletion of payloads setting the following annotation parameter:
From 8c18a75226298fec140719e1ed692fa6ba854a6e Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Mon, 31 Aug 2020 12:13:00 +0200
Subject: [PATCH 18/21] Fix example app version for powertools
---
example/HelloWorldFunction/pom.xml | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/example/HelloWorldFunction/pom.xml b/example/HelloWorldFunction/pom.xml
index efe50ace6..06735af31 100644
--- a/example/HelloWorldFunction/pom.xml
+++ b/example/HelloWorldFunction/pom.xml
@@ -16,12 +16,12 @@
software.amazon.lambda
powertools-tracing
- 0.1.0-SNAPSHOT
+ 0.1.0-beta
software.amazon.lambda
powertools-logging
- 0.1.0-SNAPSHOT
+ 0.1.0-beta
com.amazonaws
From 563f43a99e888ec650cad688f6fd5288fe3861ae Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Mon, 31 Aug 2020 12:19:07 +0200
Subject: [PATCH 19/21] Set env var for sdk to use during remote builds
---
.github/workflows/build.yml | 1 +
1 file changed, 1 insertion(+)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 35bbce70b..adb4f9b14 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -20,6 +20,7 @@ jobs:
env:
OS: ${{ matrix.os }}
JAVA: ${{ matrix.java-version }}
+ AWS_REGION: eu-west-1
steps:
- uses: actions/checkout@v2
- name: Setup java
From 9d15a2314dfb5744f224349475dffc45338f1c87 Mon Sep 17 00:00:00 2001
From: Pankaj Agrawal
Date: Mon, 31 Aug 2020 12:22:53 +0200
Subject: [PATCH 20/21] Fix parent module version for sqs
---
powertools-sqs/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml
index daa2eb840..d87d78765 100644
--- a/powertools-sqs/pom.xml
+++ b/powertools-sqs/pom.xml
@@ -10,7 +10,7 @@
powertools-parent
software.amazon.lambda
- 0.1.0-SNAPSHOT
+ 0.1.0-beta
AWS Lambda Powertools Java library SQS
From 9eab98b2156bd65807e934800283eace1744b790 Mon Sep 17 00:00:00 2001
From: msailes
Date: Mon, 31 Aug 2020 15:58:46 +0100
Subject: [PATCH 21/21] documentation improvements.
---
docs/content/utilities/sqs_large_message_handling.mdx | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx
index 606b078f6..61740a3a7 100644
--- a/docs/content/utilities/sqs_large_message_handling.mdx
+++ b/docs/content/utilities/sqs_large_message_handling.mdx
@@ -29,7 +29,7 @@ To install this utility, add the following dependency to your project.
software.amazon.lambda
powertools-sqs
- x.x.x
+ 0.1.0-beta
```
@@ -92,9 +92,8 @@ public class SqsMessageHandler implements RequestHandler {
When the Lambda function is invoked with an event from SQS, each record received
in the SQSEvent will be checked to see if it's body contains a payload which has
been offloaded to S3. If it does then `getObject(bucket, key)` will be called,
-and the payload retrieved.
-
-*Note*: Retrieving payloads from S3 will increase the duration of the Lambda function.
+and the payload retrieved. If there is an error during this process then the
+function will fail with a `FailedProcessingLargePayloadException` exception.
If the request handler method returns without error then each payload will be
deleted from S3 using `deleteObject(bucket, key)`