From 6892f45ec4b688ce39869306c15ce370c6da85f7 Mon Sep 17 00:00:00 2001 From: msailes Date: Fri, 28 Aug 2020 12:03:17 +0100 Subject: [PATCH 01/21] first working version --- pom.xml | 14 +++ powertools-sqs/pom.xml | 107 +++++++++++++++++ .../powertools/sqs/LargeMessageHandler.aj | 11 ++ .../sqs/internal/SqsMessageAspect.java | 109 ++++++++++++++++++ .../sqs/handlers/SqsMessageHandler.java | 15 +++ .../sqs/internal/SqsMessageAspectTest.java | 47 ++++++++ .../src/test/resources/large-message.json | 0 .../src/test/resources/small-message.json | 1 + 8 files changed, 304 insertions(+) create mode 100644 powertools-sqs/pom.xml create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java create mode 100644 powertools-sqs/src/test/resources/large-message.json create mode 100644 powertools-sqs/src/test/resources/small-message.json diff --git a/pom.xml b/pom.xml index 1ae0030c8..b58289c69 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ powertools-core powertools-logging powertools-tracing + powertools-sqs @@ -50,9 +51,12 @@ 2.13.3 2.11.0 1.9.6 + 2.14.4 2.4.0 + 1.0.0 UTF-8 1.2.1 + 3.1.0 3.8.1 1.12.1 2.22.2 @@ -83,6 +87,16 @@ aws-lambda-java-core ${lambda.core.version} + + com.amazonaws + aws-lambda-java-events + ${lambda.events.version} + + + software.amazon.payloadoffloading + payloadoffloading-common + ${payloadoffloading-common.version} + org.aspectj aspectjrt diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml new file mode 100644 index 000000000..e2c2d6270 --- /dev/null +++ b/powertools-sqs/pom.xml @@ -0,0 +1,107 @@ + + + 4.0.0 + + powertools-sqs + jar + + + powertools-parent + software.amazon.lambda + 0.1.0-SNAPSHOT + + + AWS Lambda Powertools Java library SQS + + A suite of utilities for AWS Lambda Functions that makes tracing with AWS X-Ray, structured logging and creating custom metrics asynchronously easier. + + https://aws.amazon.com/lambda/ + + GitHub Issues + https://github.com/awslabs/aws-lambda-powertools-java/issues + + + https://github.com/awslabs/aws-lambda-powertools-java.git + + + + AWS Lambda Powertools team + Amazon Web Services + https://aws.amazon.com/ + + + + + + ossrh + https://aws.oss.sonatype.org/content/repositories/snapshots + + + + + + software.amazon.lambda + powertools-core + + + com.amazonaws + aws-lambda-java-core + + + com.amazonaws + aws-lambda-java-events + + + software.amazon.payloadoffloading + payloadoffloading-common + + + + org.apache.logging.log4j + log4j-core + + + org.apache.logging.log4j + log4j-api + + + org.aspectj + aspectjrt + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.apache.commons + commons-lang3 + test + + + org.mockito + mockito-core + test + + + org.aspectj + aspectjweaver + test + + + org.assertj + assertj-core + test + + + + \ No newline at end of file diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj new file mode 100644 index 000000000..39e32c93c --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj @@ -0,0 +1,11 @@ +package software.amazon.lambda.powertools.sqs; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface LargeMessageHandler { +} diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java new file mode 100644 index 000000000..9ef701240 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -0,0 +1,109 @@ +package software.amazon.lambda.powertools.sqs.internal; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.util.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; +import software.amazon.payloadoffloading.PayloadS3Pointer; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +@Aspect +public class SqsMessageAspect { + + private static final Log LOG = LogFactory.getLog(SqsMessageAspect.class); + private AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient(); + + @SuppressWarnings({"EmptyMethod"}) + @Pointcut("@annotation(largeMessageHandler)") + public void callAt(LargeMessageHandler largeMessageHandler) { + } + + @Around(value = "callAt(largeMessageHandler) && execution(@LargeMessageHandler * *.*(..))", argNames = "pjp,largeMessageHandler") + public Object around(ProceedingJoinPoint pjp, + LargeMessageHandler largeMessageHandler) throws Throwable { + Object[] proceedArgs = pjp.getArgs(); + Object[] rewrittenArgs = rewriteMessages(proceedArgs); + + Object proceed = pjp.proceed(rewrittenArgs); + + List payloadS3Pointers = collectPayloadPointers(proceedArgs); + payloadS3Pointers.forEach(this::deleteMessageFromS3); + + return proceed; + } + + private List collectPayloadPointers(Object[] args) { + SQSEvent sqsEvent = (SQSEvent) args[0]; + + return sqsEvent.getRecords().stream() + .filter(record -> isBodyLargeMessagePointer(record.getBody())) + .map(record -> PayloadS3Pointer.fromJson(record.getBody())) + .collect(Collectors.toList()); + } + + private Object[] rewriteMessages(Object[] args) { + SQSEvent sqsEvent = (SQSEvent) args[0]; + + for (SQSEvent.SQSMessage sqsMessage : sqsEvent.getRecords()) { + if (isBodyLargeMessagePointer(sqsMessage.getBody())) { + PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody()); + + try { + S3Object object = amazonS3.getObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key()); + sqsMessage.setBody(readStringFromS3Object(object)); + LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key()); + } catch (AmazonServiceException e) { + LOG.error("A service exception", e); + } catch (SdkClientException e) { + LOG.error("Some sort of client exception", e); + } + } + } + + args[0] = sqsEvent; + + return args; + } + + private boolean isBodyLargeMessagePointer(String record) { + return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\""); + } + + private String readStringFromS3Object(S3Object object) { + S3ObjectInputStream is = object.getObjectContent(); + String s3Body = null; + try { + s3Body = IOUtils.toString(is); + } catch (IOException e) { + LOG.error("Error converting S3 object to String", e); + } finally { + IOUtils.closeQuietly(is, LOG); + } + return s3Body; + } + + private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { + try { + amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key()); + LOG.info("Message deleted from S3: " + s3Pointer.toJson()); + } catch (AmazonServiceException e) { + LOG.error("A service exception", e); + } catch (SdkClientException e) { + LOG.error("Some sort of client exception", e); + } + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java new file mode 100644 index 000000000..b3d7d4af7 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java @@ -0,0 +1,15 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; + +public class SqsMessageHandler implements RequestHandler { + + @Override + @LargeMessageHandler + public String handleRequest(SQSEvent sqsEvent, Context context) { + return sqsEvent.getRecords().get(0).getBody(); + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java new file mode 100644 index 000000000..d4d0c0ed2 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java @@ -0,0 +1,47 @@ +package software.amazon.lambda.powertools.sqs.internal; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler; + +import java.util.Arrays; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class SqsMessageAspectTest { + + private RequestHandler requestHandler; + + @Mock + private Context context; + + @BeforeEach + void setUp() { + initMocks(this); + setupContext(); + requestHandler = new SqsMessageHandler(); + } + + @Test + public void testLargeMessage() { + SQSEvent.SQSMessage sqsMessage = new SQSEvent.SQSMessage(); + sqsMessage.setBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"ms-extended-sqs-client\",\"s3Key\":\"c71eb2ae-37e0-4265-8909-32f4153faddf\"}]"); + SQSEvent sqsEvent = new SQSEvent(); + sqsEvent.setRecords(Arrays.asList(sqsMessage)); + String response = requestHandler.handleRequest(sqsEvent, context); + +// assertThat(response).hasToString("newValueFromS3"); + } + + private void setupContext() { + when(context.getFunctionName()).thenReturn("testFunction"); + when(context.getInvokedFunctionArn()).thenReturn("testArn"); + when(context.getFunctionVersion()).thenReturn("1"); + when(context.getMemoryLimitInMB()).thenReturn(10); + } +} \ No newline at end of file diff --git a/powertools-sqs/src/test/resources/large-message.json b/powertools-sqs/src/test/resources/large-message.json new file mode 100644 index 000000000..e69de29bb diff --git a/powertools-sqs/src/test/resources/small-message.json b/powertools-sqs/src/test/resources/small-message.json new file mode 100644 index 000000000..bc0356cef --- /dev/null +++ b/powertools-sqs/src/test/resources/small-message.json @@ -0,0 +1 @@ +{Records: [{messageId: bf73ff26-91ef-4da7-95d2-a78459803354,receiptHandle: AQEBTHZU0y5EmTwswCwnxSNYUOx0RC6S4XtPQndT6CI4kVzIcAi4pTcZyyPGCSLvxIlcn1kqIMdlqPx/Y8pW253CtPH3k5kQcoi9jrx5gbciiNJt9RNxOOxgSnWXGmLOumVsxq3mIk4udDeqKQmEzfJ4NmAno1eTIGxYa2OT9I0gCEf63fQPJyAFGfdbTPRW8MBzehkuHEwT5IWnSIPCi6IiiJSP2lpMDTtvVTMkUUGEeWr6xae7kQdklXi4Qfc4HuqCVDGobyG0/a/GR4rEoZIsf1GVvhWt7Kt5+XP1fIt08q6VlINO1gWfzoZrEGE1SfC+76gyjFMiWz0kzPDxSWymY80HFEE2cVaFQAjLeTBWlzYjKrELdRSX+VSJEuonKyi6jWz/7Jq4SvDca4S6N5Fo7Pedb5/ycDSzMSzCx31Juwru96yoTfYAZc+tOe/T0+Pr,eventSourceARN: arn:aws:sqs:eu-west-1:719169216310:large-payload-handler-LargeMessagesQueue-1NHWQ9486PN1T,eventSource: aws:sqs,awsRegion: eu-west-1,body: a small message,md5OfBody: aa608debbb18ac56aa5309265665c5f2,attributes: {ApproximateReceiveCount=1, SentTimestamp=1598611408346, SenderId=AIDA2O4OTL43OERPSY2OR, ApproximateFirstReceiveTimestamp=1598611408427},messageAttributes: {}}]} \ No newline at end of file From d714d734ac0acb7b4831a5cd6249e128ff7a5c03 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Fri, 28 Aug 2020 15:55:19 +0200 Subject: [PATCH 02/21] Minor refactoring for java8 specific constructs --- .../sqs/internal/SqsMessageAspect.java | 82 ++++++++++++------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java index 9ef701240..0eb4869ef 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -1,7 +1,12 @@ package software.amazon.lambda.powertools.sqs.internal; +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; +import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; @@ -17,15 +22,16 @@ import software.amazon.lambda.powertools.sqs.LargeMessageHandler; import software.amazon.payloadoffloading.PayloadS3Pointer; -import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; +import static java.util.Optional.empty; +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; +import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; @Aspect public class SqsMessageAspect { private static final Log LOG = LogFactory.getLog(SqsMessageAspect.class); - private AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient(); + private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient(); @SuppressWarnings({"EmptyMethod"}) @Pointcut("@annotation(largeMessageHandler)") @@ -36,23 +42,25 @@ public void callAt(LargeMessageHandler largeMessageHandler) { public Object around(ProceedingJoinPoint pjp, LargeMessageHandler largeMessageHandler) throws Throwable { Object[] proceedArgs = pjp.getArgs(); - Object[] rewrittenArgs = rewriteMessages(proceedArgs); - - Object proceed = pjp.proceed(rewrittenArgs); - List payloadS3Pointers = collectPayloadPointers(proceedArgs); - payloadS3Pointers.forEach(this::deleteMessageFromS3); + if (isHandlerMethod(pjp) + && placedOnSqsEventRequestHandler(pjp)) { + Object[] rewrittenArgs = rewriteMessages(proceedArgs); + Object proceed = pjp.proceed(rewrittenArgs); + deletePayloadPointers(proceedArgs); + return proceed; + } - return proceed; + return pjp.proceed(proceedArgs); } - private List collectPayloadPointers(Object[] args) { + private void deletePayloadPointers(Object[] args) { SQSEvent sqsEvent = (SQSEvent) args[0]; - return sqsEvent.getRecords().stream() + sqsEvent.getRecords().stream() .filter(record -> isBodyLargeMessagePointer(record.getBody())) .map(record -> PayloadS3Pointer.fromJson(record.getBody())) - .collect(Collectors.toList()); + .forEach(this::deleteMessageFromS3); } private Object[] rewriteMessages(Object[] args) { @@ -62,15 +70,14 @@ private Object[] rewriteMessages(Object[] args) { if (isBodyLargeMessagePointer(sqsMessage.getBody())) { PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody()); - try { - S3Object object = amazonS3.getObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key()); - sqsMessage.setBody(readStringFromS3Object(object)); + Optional s3Object = callS3Gracefully(s3Pointer, pointer -> { + S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key()); LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key()); - } catch (AmazonServiceException e) { - LOG.error("A service exception", e); - } catch (SdkClientException e) { - LOG.error("Some sort of client exception", e); - } + return object; + }); + + s3Object.flatMap(this::readStringFromS3Object) + .ifPresent(sqsMessage::setBody); } } @@ -83,27 +90,40 @@ private boolean isBodyLargeMessagePointer(String record) { return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\""); } - private String readStringFromS3Object(S3Object object) { - S3ObjectInputStream is = object.getObjectContent(); - String s3Body = null; - try { - s3Body = IOUtils.toString(is); + private Optional readStringFromS3Object(S3Object object) { + try (S3ObjectInputStream is = object.getObjectContent()) { + return of(IOUtils.toString(is)); } catch (IOException e) { LOG.error("Error converting S3 object to String", e); - } finally { - IOUtils.closeQuietly(is, LOG); } - return s3Body; + + return empty(); } private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { - try { + callS3Gracefully(s3Pointer, pointer -> { amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key()); - LOG.info("Message deleted from S3: " + s3Pointer.toJson()); + LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key()); + return null; + }); + } + + private Optional callS3Gracefully(final PayloadS3Pointer pointer, + final Function function) { + try { + return ofNullable(function.apply(pointer)); } catch (AmazonServiceException e) { LOG.error("A service exception", e); } catch (SdkClientException e) { LOG.error("Some sort of client exception", e); } + + return empty(); + } + + public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) { + return pjp.getArgs().length == 2 + && pjp.getArgs()[0] instanceof SQSEvent + && pjp.getArgs()[1] instanceof Context; } } From 1e71bdca2409d6e036f10ac2bbde60cfc918962b Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Fri, 28 Aug 2020 16:01:28 +0200 Subject: [PATCH 03/21] Fix logs for deletion --- .../amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java index 0eb4869ef..83e30f18a 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -103,7 +103,7 @@ private Optional readStringFromS3Object(S3Object object) { private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { callS3Gracefully(s3Pointer, pointer -> { amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key()); - LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key()); + LOG.info("Message deleted from S3: " + s3Pointer.toJson()); return null; }); } From e3edebe05745cf3ed0fe7cb46b300cc8c58f3c42 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Fri, 28 Aug 2020 16:15:15 +0200 Subject: [PATCH 04/21] Initial test cases to remove remote interaction --- pom.xml | 6 ++ powertools-sqs/pom.xml | 5 ++ .../sqs/internal/SqsMessageAspect.java | 16 ++-- .../sqs/internal/SqsMessageAspectTest.java | 76 +++++++++++++++++-- 4 files changed, 90 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index b58289c69..68ce03c16 100644 --- a/pom.xml +++ b/pom.xml @@ -156,6 +156,12 @@ 5.6.2 test + + org.junit.jupiter + junit-jupiter-params + 5.6.2 + test + org.apache.commons commons-lang3 diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml index e2c2d6270..23cb66835 100644 --- a/powertools-sqs/pom.xml +++ b/powertools-sqs/pom.xml @@ -82,6 +82,11 @@ junit-jupiter-engine test + + org.junit.jupiter + junit-jupiter-params + test + org.apache.commons commons-lang3 diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java index 83e30f18a..e9953371d 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -1,8 +1,10 @@ package software.amazon.lambda.powertools.sqs.internal; import java.io.IOException; +import java.util.List; import java.util.Optional; import java.util.function.Function; +import java.util.stream.Collectors; import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; @@ -22,6 +24,7 @@ import software.amazon.lambda.powertools.sqs.LargeMessageHandler; import software.amazon.payloadoffloading.PayloadS3Pointer; +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import static java.util.Optional.empty; import static java.util.Optional.of; import static java.util.Optional.ofNullable; @@ -45,28 +48,31 @@ public Object around(ProceedingJoinPoint pjp, if (isHandlerMethod(pjp) && placedOnSqsEventRequestHandler(pjp)) { + List payloadS3Pointers = collectPayloadPointers(proceedArgs); + Object[] rewrittenArgs = rewriteMessages(proceedArgs); Object proceed = pjp.proceed(rewrittenArgs); - deletePayloadPointers(proceedArgs); + + payloadS3Pointers.forEach(this::deleteMessageFromS3); return proceed; } return pjp.proceed(proceedArgs); } - private void deletePayloadPointers(Object[] args) { + private List collectPayloadPointers(Object[] args) { SQSEvent sqsEvent = (SQSEvent) args[0]; - sqsEvent.getRecords().stream() + return sqsEvent.getRecords().stream() .filter(record -> isBodyLargeMessagePointer(record.getBody())) .map(record -> PayloadS3Pointer.fromJson(record.getBody())) - .forEach(this::deleteMessageFromS3); + .collect(Collectors.toList()); } private Object[] rewriteMessages(Object[] args) { SQSEvent sqsEvent = (SQSEvent) args[0]; - for (SQSEvent.SQSMessage sqsMessage : sqsEvent.getRecords()) { + for (SQSMessage sqsMessage : sqsEvent.getRecords()) { if (isBodyLargeMessagePointer(sqsMessage.getBody())) { PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody()); diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java index d4d0c0ed2..8acaace45 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java @@ -1,15 +1,29 @@ package software.amazon.lambda.powertools.sqs.internal; +import java.io.ByteArrayInputStream; +import java.util.stream.Stream; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.S3Object; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler; -import java.util.Arrays; - +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -20,22 +34,68 @@ public class SqsMessageAspectTest { @Mock private Context context; + @Mock + private AmazonS3 amazonS3; + @BeforeEach - void setUp() { + void setUp() throws IllegalAccessException { initMocks(this); setupContext(); + writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true); requestHandler = new SqsMessageHandler(); } @Test public void testLargeMessage() { - SQSEvent.SQSMessage sqsMessage = new SQSEvent.SQSMessage(); - sqsMessage.setBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"ms-extended-sqs-client\",\"s3Key\":\"c71eb2ae-37e0-4265-8909-32f4153faddf\"}]"); - SQSEvent sqsEvent = new SQSEvent(); - sqsEvent.setRecords(Arrays.asList(sqsMessage)); + String bucketName = "ms-extended-sqs-client"; + String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + + when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response); + SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"); + String response = requestHandler.handleRequest(sqsEvent, context); -// assertThat(response).hasToString("newValueFromS3"); + assertThat(response) + .isEqualTo("A big message"); + + verify(amazonS3).deleteObject(bucketName, bucketKey); + } + + @ParameterizedTest + @MethodSource("exception") + public void shouldKeepOriginalBodyIfFailedDownloadingFromS3(RuntimeException exception) { + String bucketName = "ms-extended-sqs-client"; + String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + + when(amazonS3.getObject(bucketName, bucketKey)).thenThrow(exception); + + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"; + SQSEvent sqsEvent = messageWithBody(messageBody); + + String response = requestHandler.handleRequest(sqsEvent, context); + + assertThat(response) + .isEqualTo(messageBody); + + //TODO we need to fix the logic since object should not be deleted if we never managed to download it from S3 for any reason + //verify(amazonS3, never()).deleteObject(bucketName, bucketKey); + } + + private static Stream exception() { + return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")), + Arguments.of(new SdkClientException("Client Exception"))); + } + + private SQSEvent messageWithBody(String messageBody) { + SQSMessage sqsMessage = new SQSMessage(); + sqsMessage.setBody(messageBody); + SQSEvent sqsEvent = new SQSEvent(); + sqsEvent.setRecords(singletonList(sqsMessage)); + return sqsEvent; } private void setupContext() { From 3ce91667f93fb09d96ebbbf946fe7067f0fa0d80 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Fri, 28 Aug 2020 19:15:31 +0200 Subject: [PATCH 05/21] Fail hard when failed downloading any message from batch in event --- .../sqs/internal/SqsMessageAspect.java | 59 ++++++++----------- .../sqs/internal/SqsMessageAspectTest.java | 16 +++-- 2 files changed, 32 insertions(+), 43 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java index e9953371d..7e16c4c8e 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -1,10 +1,9 @@ package software.amazon.lambda.powertools.sqs.internal; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.function.Function; -import java.util.stream.Collectors; import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; @@ -25,9 +24,7 @@ import software.amazon.payloadoffloading.PayloadS3Pointer; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; -import static java.util.Optional.empty; -import static java.util.Optional.of; -import static java.util.Optional.ofNullable; +import static java.lang.String.format; import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; @Aspect @@ -48,62 +45,50 @@ public Object around(ProceedingJoinPoint pjp, if (isHandlerMethod(pjp) && placedOnSqsEventRequestHandler(pjp)) { - List payloadS3Pointers = collectPayloadPointers(proceedArgs); + List pointersToDelete = rewriteMessages(proceedArgs); - Object[] rewrittenArgs = rewriteMessages(proceedArgs); - Object proceed = pjp.proceed(rewrittenArgs); + Object proceed = pjp.proceed(proceedArgs); - payloadS3Pointers.forEach(this::deleteMessageFromS3); + pointersToDelete.forEach(this::deleteMessageFromS3); return proceed; } return pjp.proceed(proceedArgs); } - private List collectPayloadPointers(Object[] args) { - SQSEvent sqsEvent = (SQSEvent) args[0]; - - return sqsEvent.getRecords().stream() - .filter(record -> isBodyLargeMessagePointer(record.getBody())) - .map(record -> PayloadS3Pointer.fromJson(record.getBody())) - .collect(Collectors.toList()); - } - - private Object[] rewriteMessages(Object[] args) { + private List rewriteMessages(Object[] args) { SQSEvent sqsEvent = (SQSEvent) args[0]; + List s3Pointers = new ArrayList<>(); for (SQSMessage sqsMessage : sqsEvent.getRecords()) { if (isBodyLargeMessagePointer(sqsMessage.getBody())) { PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody()); - Optional s3Object = callS3Gracefully(s3Pointer, pointer -> { + S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> { S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key()); LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key()); return object; }); - s3Object.flatMap(this::readStringFromS3Object) - .ifPresent(sqsMessage::setBody); + sqsMessage.setBody(readStringFromS3Object(s3Object)); + s3Pointers.add(s3Pointer); } } - args[0] = sqsEvent; - - return args; + return s3Pointers; } private boolean isBodyLargeMessagePointer(String record) { return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\""); } - private Optional readStringFromS3Object(S3Object object) { + private String readStringFromS3Object(S3Object object) { try (S3ObjectInputStream is = object.getObjectContent()) { - return of(IOUtils.toString(is)); + return IOUtils.toString(is); } catch (IOException e) { LOG.error("Error converting S3 object to String", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e); } - - return empty(); } private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { @@ -114,17 +99,17 @@ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { }); } - private Optional callS3Gracefully(final PayloadS3Pointer pointer, - final Function function) { + private R callS3Gracefully(final PayloadS3Pointer pointer, + final Function function) { try { - return ofNullable(function.apply(pointer)); + return function.apply(pointer); } catch (AmazonServiceException e) { LOG.error("A service exception", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e); } catch (SdkClientException e) { LOG.error("Some sort of client exception", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e); } - - return empty(); } public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) { @@ -132,4 +117,10 @@ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) { && pjp.getArgs()[0] instanceof SQSEvent && pjp.getArgs()[1] instanceof Context; } + + static class FailedProcessingLargePayloadException extends RuntimeException { + public FailedProcessingLargePayloadException(String message, Throwable cause) { + super(message, cause); + } + } } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java index 8acaace45..d509e4074 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java @@ -22,10 +22,12 @@ import static java.util.Collections.singletonList; import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.FailedProcessingLargePayloadException; public class SqsMessageAspectTest { @@ -65,24 +67,20 @@ public void testLargeMessage() { @ParameterizedTest @MethodSource("exception") - public void shouldKeepOriginalBodyIfFailedDownloadingFromS3(RuntimeException exception) { + public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) { String bucketName = "ms-extended-sqs-client"; String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; - S3Object s3Response = new S3Object(); - s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); when(amazonS3.getObject(bucketName, bucketKey)).thenThrow(exception); String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"; SQSEvent sqsEvent = messageWithBody(messageBody); - String response = requestHandler.handleRequest(sqsEvent, context); - - assertThat(response) - .isEqualTo(messageBody); + assertThatExceptionOfType(FailedProcessingLargePayloadException.class) + .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context)) + .withCause(exception); - //TODO we need to fix the logic since object should not be deleted if we never managed to download it from S3 for any reason - //verify(amazonS3, never()).deleteObject(bucketName, bucketKey); + verify(amazonS3, never()).deleteObject(bucketName, bucketKey); } private static Stream exception() { From 8405b8131b8ede482dc7ee46640db561a266f753 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Fri, 28 Aug 2020 19:15:31 +0200 Subject: [PATCH 06/21] Fail hard when failed downloading any message from batch in event --- .../sqs/internal/SqsMessageAspect.java | 59 ++++++-------- .../sqs/handlers/LambdaHandlerApiGateway.java | 16 ++++ .../sqs/internal/SqsMessageAspectTest.java | 79 +++++++++++++++++-- 3 files changed, 114 insertions(+), 40 deletions(-) create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java index e9953371d..7e16c4c8e 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -1,10 +1,9 @@ package software.amazon.lambda.powertools.sqs.internal; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.function.Function; -import java.util.stream.Collectors; import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; @@ -25,9 +24,7 @@ import software.amazon.payloadoffloading.PayloadS3Pointer; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; -import static java.util.Optional.empty; -import static java.util.Optional.of; -import static java.util.Optional.ofNullable; +import static java.lang.String.format; import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; @Aspect @@ -48,62 +45,50 @@ public Object around(ProceedingJoinPoint pjp, if (isHandlerMethod(pjp) && placedOnSqsEventRequestHandler(pjp)) { - List payloadS3Pointers = collectPayloadPointers(proceedArgs); + List pointersToDelete = rewriteMessages(proceedArgs); - Object[] rewrittenArgs = rewriteMessages(proceedArgs); - Object proceed = pjp.proceed(rewrittenArgs); + Object proceed = pjp.proceed(proceedArgs); - payloadS3Pointers.forEach(this::deleteMessageFromS3); + pointersToDelete.forEach(this::deleteMessageFromS3); return proceed; } return pjp.proceed(proceedArgs); } - private List collectPayloadPointers(Object[] args) { - SQSEvent sqsEvent = (SQSEvent) args[0]; - - return sqsEvent.getRecords().stream() - .filter(record -> isBodyLargeMessagePointer(record.getBody())) - .map(record -> PayloadS3Pointer.fromJson(record.getBody())) - .collect(Collectors.toList()); - } - - private Object[] rewriteMessages(Object[] args) { + private List rewriteMessages(Object[] args) { SQSEvent sqsEvent = (SQSEvent) args[0]; + List s3Pointers = new ArrayList<>(); for (SQSMessage sqsMessage : sqsEvent.getRecords()) { if (isBodyLargeMessagePointer(sqsMessage.getBody())) { PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody()); - Optional s3Object = callS3Gracefully(s3Pointer, pointer -> { + S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> { S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key()); LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key()); return object; }); - s3Object.flatMap(this::readStringFromS3Object) - .ifPresent(sqsMessage::setBody); + sqsMessage.setBody(readStringFromS3Object(s3Object)); + s3Pointers.add(s3Pointer); } } - args[0] = sqsEvent; - - return args; + return s3Pointers; } private boolean isBodyLargeMessagePointer(String record) { return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\""); } - private Optional readStringFromS3Object(S3Object object) { + private String readStringFromS3Object(S3Object object) { try (S3ObjectInputStream is = object.getObjectContent()) { - return of(IOUtils.toString(is)); + return IOUtils.toString(is); } catch (IOException e) { LOG.error("Error converting S3 object to String", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e); } - - return empty(); } private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { @@ -114,17 +99,17 @@ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { }); } - private Optional callS3Gracefully(final PayloadS3Pointer pointer, - final Function function) { + private R callS3Gracefully(final PayloadS3Pointer pointer, + final Function function) { try { - return ofNullable(function.apply(pointer)); + return function.apply(pointer); } catch (AmazonServiceException e) { LOG.error("A service exception", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e); } catch (SdkClientException e) { LOG.error("Some sort of client exception", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e); } - - return empty(); } public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) { @@ -132,4 +117,10 @@ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) { && pjp.getArgs()[0] instanceof SQSEvent && pjp.getArgs()[1] instanceof Context; } + + static class FailedProcessingLargePayloadException extends RuntimeException { + public FailedProcessingLargePayloadException(String message, Throwable cause) { + super(message, cause); + } + } } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java new file mode 100644 index 000000000..0bf8260a4 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java @@ -0,0 +1,16 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; + +public class LambdaHandlerApiGateway implements RequestHandler { + + @Override + @LargeMessageHandler + public String handleRequest(APIGatewayProxyRequestEvent sqsEvent, Context context) { + return sqsEvent.getBody(); + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java index 8acaace45..63fc73695 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java @@ -1,31 +1,41 @@ package software.amazon.lambda.powertools.sqs.internal; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.stream.Stream; import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.util.StringInputStream; +import org.apache.http.client.methods.HttpRequestBase; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; +import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway; import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import static java.util.Collections.singletonList; import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.FailedProcessingLargePayloadException; public class SqsMessageAspectTest { @@ -63,26 +73,83 @@ public void testLargeMessage() { verify(amazonS3).deleteObject(bucketName, bucketKey); } - @ParameterizedTest - @MethodSource("exception") - public void shouldKeepOriginalBodyIfFailedDownloadingFromS3(RuntimeException exception) { + @Test + public void shouldNotProcessSmallMessageBody() { String bucketName = "ms-extended-sqs-client"; String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; S3Object s3Response = new S3Object(); s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response); + SQSEvent sqsEvent = messageWithBody("This is small message"); + + String response = requestHandler.handleRequest(sqsEvent, context); + + assertThat(response) + .isEqualTo("This is small message"); + + verifyNoInteractions(amazonS3); + } + + @ParameterizedTest + @MethodSource("exception") + public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) { + String bucketName = "ms-extended-sqs-client"; + String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + when(amazonS3.getObject(bucketName, bucketKey)).thenThrow(exception); String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"; SQSEvent sqsEvent = messageWithBody(messageBody); - String response = requestHandler.handleRequest(sqsEvent, context); + assertThatExceptionOfType(FailedProcessingLargePayloadException.class) + .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context)) + .withCause(exception); + + verify(amazonS3, never()).deleteObject(bucketName, bucketKey); + } + + @Test + public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException { + String bucketName = "ms-extended-sqs-client"; + String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + S3Object s3Response = new S3Object(); + + s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") { + @Override + public void close() throws IOException { + throw new IOException("Failed"); + } + }, mock(HttpRequestBase.class))); + + when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response); + + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"; + SQSEvent sqsEvent = messageWithBody(messageBody); + + assertThatExceptionOfType(FailedProcessingLargePayloadException.class) + .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context)) + .withCauseInstanceOf(IOException.class); + + verify(amazonS3, never()).deleteObject(bucketName, bucketKey); + } + + @Test + public void shouldNotDoAnyProcessingWhenNotSqsEvent() { + String bucketName = "ms-extended-sqs-client"; + String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + LambdaHandlerApiGateway handler = new LambdaHandlerApiGateway(); + + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"; + + APIGatewayProxyRequestEvent event = new APIGatewayProxyRequestEvent(); + event.setBody(messageBody); + String response = handler.handleRequest(event, context); assertThat(response) .isEqualTo(messageBody); - //TODO we need to fix the logic since object should not be deleted if we never managed to download it from S3 for any reason - //verify(amazonS3, never()).deleteObject(bucketName, bucketKey); + verifyNoInteractions(amazonS3); } private static Stream exception() { From d1b23696243f7a5eeb23464ad510e5b37ad5b14b Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Fri, 28 Aug 2020 21:50:12 +0200 Subject: [PATCH 07/21] Extract junit version as prop --- pom.xml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 68ce03c16..1b4d2808f 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ 2.10.3 2.4 1.6 + 5.6.2 @@ -147,19 +148,19 @@ org.junit.jupiter junit-jupiter-api - 5.6.2 + ${junit-jupiter.version} test org.junit.jupiter junit-jupiter-engine - 5.6.2 + ${junit-jupiter.version} test org.junit.jupiter junit-jupiter-params - 5.6.2 + ${junit-jupiter.version} test From 4a049087ccb89bd888c73d29f208a50ed8d437c6 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Fri, 28 Aug 2020 21:52:24 +0200 Subject: [PATCH 08/21] Remove unused log4j deps --- powertools-sqs/pom.xml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml index 23cb66835..daa2eb840 100644 --- a/powertools-sqs/pom.xml +++ b/powertools-sqs/pom.xml @@ -58,14 +58,6 @@ payloadoffloading-common - - org.apache.logging.log4j - log4j-core - - - org.apache.logging.log4j - log4j-api - org.aspectj aspectjrt From dd176b3e12f389643bc07113ed6d8dfb64b47d5e Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sat, 29 Aug 2020 08:10:29 +0200 Subject: [PATCH 09/21] Remove dependency of args on methods for reuse --- .../lambda/powertools/sqs/internal/SqsMessageAspect.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java index 7e16c4c8e..ed8b6cd4d 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -45,7 +45,7 @@ public Object around(ProceedingJoinPoint pjp, if (isHandlerMethod(pjp) && placedOnSqsEventRequestHandler(pjp)) { - List pointersToDelete = rewriteMessages(proceedArgs); + List pointersToDelete = rewriteMessages((SQSEvent) proceedArgs[0]); Object proceed = pjp.proceed(proceedArgs); @@ -56,8 +56,7 @@ && placedOnSqsEventRequestHandler(pjp)) { return pjp.proceed(proceedArgs); } - private List rewriteMessages(Object[] args) { - SQSEvent sqsEvent = (SQSEvent) args[0]; + private List rewriteMessages(SQSEvent sqsEvent) { List s3Pointers = new ArrayList<>(); for (SQSMessage sqsMessage : sqsEvent.getRecords()) { From 987bf86c6c101c03e58938a7466e80ef6ec97ff3 Mon Sep 17 00:00:00 2001 From: msailes Date: Sat, 29 Aug 2020 12:33:25 +0100 Subject: [PATCH 10/21] added java docs and a new feature to disable deletion of message payloads. --- .../powertools/sqs/LargeMessageHandler.aj | 57 +++++++++++++++++++ .../sqs/internal/SqsMessageAspect.java | 14 +++-- .../handlers/SqsNoDeleteMessageHandler.java | 15 +++++ .../sqs/internal/SqsMessageAspectTest.java | 50 ++++++++++------ 4 files changed, 113 insertions(+), 23 deletions(-) create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj index 39e32c93c..694230df2 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj @@ -5,7 +5,64 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +/** + * {@code LargeMessageHandler} is used to signal that the annotated method + * should be extended to handle large SQS messages which have been offloaded + * to S3 + * + *

{@code LargeMessageHandler} automatically retrieves and deletes messages + * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} + * client library.

+ * + *

This version of the {@code LargeMessageHandler} is compatible with version + * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.

+ * + *
+ * <dependency>
+ *   <groupId>com.amazonaws</groupId>
+ *   <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
+ *   <version>1.1.0</version>
+ * </dependency>
+ * 
+ * + *

{@code LargeMessageHandler} should be used with the handleRequest method of a class + * which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with + * {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.

+ * + *
+ * public class SqsMessageHandler implements RequestHandler {
+ *
+ *    {@literal @}Override
+ *    {@literal @}LargeMessageHandler
+ *     public String handleRequest(SQSEvent sqsEvent, Context context) {
+ *
+ *         // process messages
+ *
+ *         return "ok";
+ *     }
+ *
+ *     ...
+ * 
+ * + *

Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();} + * each record received in the SQSEvent {@code LargeMessageHandler} will checked + * to see if it's body contains a payload which has been offloaded to S3. If it + * does then {@code getObject(bucket, key)} will be called and the payload + * retrieved.

+ * + *

Note: Retreiving payloads from S3 will increase the duration of the + * Lambda function.

+ * + *

If the request handler method returns then each payload will be deleted + * from S3 using {@code deleteObject(bucket, key)}

+ * + *

To disable deletion of payloads setting the following annotation parameter + * {@code LargeMessageHandler(deletePayloads=false)}

+ * + */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface LargeMessageHandler { + + boolean deletePayloads() default true; } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java index 7e16c4c8e..43ccc874b 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -1,10 +1,5 @@ package software.amazon.lambda.powertools.sqs.internal; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Function; - import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; import com.amazonaws.services.lambda.runtime.Context; @@ -23,6 +18,11 @@ import software.amazon.lambda.powertools.sqs.LargeMessageHandler; import software.amazon.payloadoffloading.PayloadS3Pointer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import static java.lang.String.format; import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; @@ -49,7 +49,9 @@ && placedOnSqsEventRequestHandler(pjp)) { Object proceed = pjp.proceed(proceedArgs); - pointersToDelete.forEach(this::deleteMessageFromS3); + if (largeMessageHandler.deletePayloads()) { + pointersToDelete.forEach(this::deleteMessageFromS3); + } return proceed; } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java new file mode 100644 index 000000000..a301a2a65 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java @@ -0,0 +1,15 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; + +public class SqsNoDeleteMessageHandler implements RequestHandler { + + @Override + @LargeMessageHandler(deletePayloads = false) + public String handleRequest(SQSEvent sqsEvent, Context context) { + return sqsEvent.getRecords().get(0).getBody(); + } +} \ No newline at end of file diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java index d509e4074..a3aac55af 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java @@ -1,8 +1,5 @@ package software.amazon.lambda.powertools.sqs.internal; -import java.io.ByteArrayInputStream; -import java.util.stream.Stream; - import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; import com.amazonaws.services.lambda.runtime.Context; @@ -17,6 +14,10 @@ import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler; +import software.amazon.lambda.powertools.sqs.handlers.SqsNoDeleteMessageHandler; + +import java.io.ByteArrayInputStream; +import java.util.stream.Stream; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import static java.util.Collections.singletonList; @@ -39,6 +40,9 @@ public class SqsMessageAspectTest { @Mock private AmazonS3 amazonS3; + private static final String BUCKET_NAME = "bucketname"; + private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + @BeforeEach void setUp() throws IllegalAccessException { initMocks(this); @@ -49,38 +53,50 @@ void setUp() throws IllegalAccessException { @Test public void testLargeMessage() { - String bucketName = "ms-extended-sqs-client"; - String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; - S3Object s3Response = new S3Object(); - s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); - - when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response); - SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"); + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3ObjectWithLargeMessage()); + SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"); String response = requestHandler.handleRequest(sqsEvent, context); assertThat(response) .isEqualTo("A big message"); - verify(amazonS3).deleteObject(bucketName, bucketKey); + verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY); } @ParameterizedTest @MethodSource("exception") public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) { - String bucketName = "ms-extended-sqs-client"; - String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; - - when(amazonS3.getObject(bucketName, bucketKey)).thenThrow(exception); + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenThrow(exception); - String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"; + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; SQSEvent sqsEvent = messageWithBody(messageBody); assertThatExceptionOfType(FailedProcessingLargePayloadException.class) .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context)) .withCause(exception); - verify(amazonS3, never()).deleteObject(bucketName, bucketKey); + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + @Test + public void testLargeMessageWithDeletionOff() { + requestHandler = new SqsNoDeleteMessageHandler(); + + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3ObjectWithLargeMessage()); + SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"); + + String response = requestHandler.handleRequest(sqsEvent, context); + + assertThat(response).isEqualTo("A big message"); + + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + private S3Object s3ObjectWithLargeMessage() { + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + return s3Response; } private static Stream exception() { From 2ca2a4218369100831163ed6a502780c26f43a99 Mon Sep 17 00:00:00 2001 From: msailes Date: Sat, 29 Aug 2020 14:29:18 +0100 Subject: [PATCH 11/21] refactored tests. --- .../sqs/internal/SqsMessageAspectTest.java | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java index 2a216c546..cafec7eef 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java @@ -1,9 +1,5 @@ package software.amazon.lambda.powertools.sqs.internal; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.stream.Stream; - import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; import com.amazonaws.services.lambda.runtime.Context; @@ -26,6 +22,7 @@ import software.amazon.lambda.powertools.sqs.handlers.SqsNoDeleteMessageHandler; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.stream.Stream; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; @@ -34,7 +31,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.Mockito.mock; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -78,12 +74,10 @@ public void testLargeMessage() { @Test public void shouldNotProcessSmallMessageBody() { - String bucketName = "ms-extended-sqs-client"; - String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; S3Object s3Response = new S3Object(); s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); - when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response); + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response); SQSEvent sqsEvent = messageWithBody("This is small message"); String response = requestHandler.handleRequest(sqsEvent, context); @@ -126,10 +120,7 @@ public void testLargeMessageWithDeletionOff() { @Test public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException { - String bucketName = "ms-extended-sqs-client"; - String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; S3Object s3Response = new S3Object(); - s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") { @Override public void close() throws IOException { @@ -137,25 +128,23 @@ public void close() throws IOException { } }, mock(HttpRequestBase.class))); - when(amazonS3.getObject(bucketName, bucketKey)).thenReturn(s3Response); + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response); - String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"; + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; SQSEvent sqsEvent = messageWithBody(messageBody); assertThatExceptionOfType(FailedProcessingLargePayloadException.class) .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context)) .withCauseInstanceOf(IOException.class); - verify(amazonS3, never()).deleteObject(bucketName, bucketKey); + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); } @Test public void shouldNotDoAnyProcessingWhenNotSqsEvent() { - String bucketName = "ms-extended-sqs-client"; - String bucketKey = "c71eb2ae-37e0-4265-8909-32f4153faddf"; LambdaHandlerApiGateway handler = new LambdaHandlerApiGateway(); - String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + bucketName + "\",\"s3Key\":\"" + bucketKey + "\"}]"; + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; APIGatewayProxyRequestEvent event = new APIGatewayProxyRequestEvent(); event.setBody(messageBody); From f02356c43d51bac13cf70929e862e39530335835 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sat, 29 Aug 2020 19:18:05 +0200 Subject: [PATCH 12/21] Remove unused test resources --- powertools-sqs/src/test/resources/large-message.json | 0 powertools-sqs/src/test/resources/small-message.json | 1 - 2 files changed, 1 deletion(-) delete mode 100644 powertools-sqs/src/test/resources/large-message.json delete mode 100644 powertools-sqs/src/test/resources/small-message.json diff --git a/powertools-sqs/src/test/resources/large-message.json b/powertools-sqs/src/test/resources/large-message.json deleted file mode 100644 index e69de29bb..000000000 diff --git a/powertools-sqs/src/test/resources/small-message.json b/powertools-sqs/src/test/resources/small-message.json deleted file mode 100644 index bc0356cef..000000000 --- a/powertools-sqs/src/test/resources/small-message.json +++ /dev/null @@ -1 +0,0 @@ -{Records: [{messageId: bf73ff26-91ef-4da7-95d2-a78459803354,receiptHandle: AQEBTHZU0y5EmTwswCwnxSNYUOx0RC6S4XtPQndT6CI4kVzIcAi4pTcZyyPGCSLvxIlcn1kqIMdlqPx/Y8pW253CtPH3k5kQcoi9jrx5gbciiNJt9RNxOOxgSnWXGmLOumVsxq3mIk4udDeqKQmEzfJ4NmAno1eTIGxYa2OT9I0gCEf63fQPJyAFGfdbTPRW8MBzehkuHEwT5IWnSIPCi6IiiJSP2lpMDTtvVTMkUUGEeWr6xae7kQdklXi4Qfc4HuqCVDGobyG0/a/GR4rEoZIsf1GVvhWt7Kt5+XP1fIt08q6VlINO1gWfzoZrEGE1SfC+76gyjFMiWz0kzPDxSWymY80HFEE2cVaFQAjLeTBWlzYjKrELdRSX+VSJEuonKyi6jWz/7Jq4SvDca4S6N5Fo7Pedb5/ycDSzMSzCx31Juwru96yoTfYAZc+tOe/T0+Pr,eventSourceARN: arn:aws:sqs:eu-west-1:719169216310:large-payload-handler-LargeMessagesQueue-1NHWQ9486PN1T,eventSource: aws:sqs,awsRegion: eu-west-1,body: a small message,md5OfBody: aa608debbb18ac56aa5309265665c5f2,attributes: {ApproximateReceiveCount=1, SentTimestamp=1598611408346, SenderId=AIDA2O4OTL43OERPSY2OR, ApproximateFirstReceiveTimestamp=1598611408427},messageAttributes: {}}]} \ No newline at end of file From a5f56872438deddb2135159bbecc1ffc412e5955 Mon Sep 17 00:00:00 2001 From: msailes Date: Sat, 29 Aug 2020 20:35:48 +0100 Subject: [PATCH 13/21] rename --- .../powertools/sqs/LargeMessageHandler.aj | 68 ------------------- 1 file changed, 68 deletions(-) delete mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj deleted file mode 100644 index 694230df2..000000000 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.aj +++ /dev/null @@ -1,68 +0,0 @@ -package software.amazon.lambda.powertools.sqs; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * {@code LargeMessageHandler} is used to signal that the annotated method - * should be extended to handle large SQS messages which have been offloaded - * to S3 - * - *

{@code LargeMessageHandler} automatically retrieves and deletes messages - * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} - * client library.

- * - *

This version of the {@code LargeMessageHandler} is compatible with version - * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.

- * - *
- * <dependency>
- *   <groupId>com.amazonaws</groupId>
- *   <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
- *   <version>1.1.0</version>
- * </dependency>
- * 
- * - *

{@code LargeMessageHandler} should be used with the handleRequest method of a class - * which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with - * {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.

- * - *
- * public class SqsMessageHandler implements RequestHandler {
- *
- *    {@literal @}Override
- *    {@literal @}LargeMessageHandler
- *     public String handleRequest(SQSEvent sqsEvent, Context context) {
- *
- *         // process messages
- *
- *         return "ok";
- *     }
- *
- *     ...
- * 
- * - *

Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();} - * each record received in the SQSEvent {@code LargeMessageHandler} will checked - * to see if it's body contains a payload which has been offloaded to S3. If it - * does then {@code getObject(bucket, key)} will be called and the payload - * retrieved.

- * - *

Note: Retreiving payloads from S3 will increase the duration of the - * Lambda function.

- * - *

If the request handler method returns then each payload will be deleted - * from S3 using {@code deleteObject(bucket, key)}

- * - *

To disable deletion of payloads setting the following annotation parameter - * {@code LargeMessageHandler(deletePayloads=false)}

- * - */ -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.METHOD) -public @interface LargeMessageHandler { - - boolean deletePayloads() default true; -} From 4b699f07ba1478c5d7af4413dc76dd321e0ef23f Mon Sep 17 00:00:00 2001 From: msailes Date: Sat, 29 Aug 2020 20:36:02 +0100 Subject: [PATCH 14/21] rename --- .../powertools/sqs/LargeMessageHandler.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java new file mode 100644 index 000000000..9f67d2906 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java @@ -0,0 +1,68 @@ +package software.amazon.lambda.powertools.sqs; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * {@code LargeMessageHandler} is used to signal that the annotated method + * should be extended to handle large SQS messages which have been offloaded + * to S3 + * + *

{@code LargeMessageHandler} automatically retrieves and deletes messages + * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} + * client library.

+ * + *

This version of the {@code LargeMessageHandler} is compatible with version + * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.

+ * + *
+ * <dependency>
+ *   <groupId>com.amazonaws</groupId>
+ *   <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
+ *   <version>1.1.0</version>
+ * </dependency>
+ * 
+ * + *

{@code LargeMessageHandler} should be used with the handleRequest method of a class + * which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with + * {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.

+ * + *
+ * public class SqsMessageHandler implements RequestHandler {
+ *
+ *    {@literal @}Override
+ *    {@literal @}LargeMessageHandler
+ *     public String handleRequest(SQSEvent sqsEvent, Context context) {
+ *
+ *         // process messages
+ *
+ *         return "ok";
+ *     }
+ *
+ *     ...
+ * 
+ * + *

Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();} + * each record received in the SQSEvent {@code LargeMessageHandler} will checked + * to see if it's body contains a payload which has been offloaded to S3. If it + * does then {@code getObject(bucket, key)} will be called and the payload + * retrieved.

+ * + *

Note: Retreiving payloads from S3 will increase the duration of the + * Lambda function.

+ * + *

If the request handler method returns then each payload will be deleted + * from S3 using {@code deleteObject(bucket, key)}

+ * + *

To disable deletion of payloads setting the following annotation parameter + * {@code @LargeMessageHandler(deletePayloads=false)}

+ * + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface LargeMessageHandler { + + boolean deletePayloads() default true; +} From d694c57f115cb950006f416a680b21949dd92ca0 Mon Sep 17 00:00:00 2001 From: msailes Date: Sun, 30 Aug 2020 14:32:08 +0100 Subject: [PATCH 15/21] intial docs page --- docs/content/dummy.md | 8 --- .../utilities/large_message_handling.mdx | 67 +++++++++++++++++++ docs/gatsby-config.js | 5 +- 3 files changed, 71 insertions(+), 9 deletions(-) delete mode 100644 docs/content/dummy.md create mode 100644 docs/content/utilities/large_message_handling.mdx diff --git a/docs/content/dummy.md b/docs/content/dummy.md deleted file mode 100644 index d53b2cc3b..000000000 --- a/docs/content/dummy.md +++ /dev/null @@ -1,8 +0,0 @@ ---- -title: Dummy title -description: Dummy description ---- - -## Test - -dummy content \ No newline at end of file diff --git a/docs/content/utilities/large_message_handling.mdx b/docs/content/utilities/large_message_handling.mdx new file mode 100644 index 000000000..9ecc8b9a1 --- /dev/null +++ b/docs/content/utilities/large_message_handling.mdx @@ -0,0 +1,67 @@ +--- +title: Large Message Handling +description: Utility +--- + + + +The large message handling utility is used to handle SQS messages which have had their payloads +offloaded to S3 due to them being larger than the SQS maximum. + +The utility automatically retrieves and deletes messages which have been offloaded to S3 using the +[amazon-sqs-java-extended-client-lib](https://github.com/awslabs/amazon-sqs-java-extended-client-lib) +client library. + +This utility is compatible with versions 1.1.0+ of amazon-sqs-java-extended-client-lib.

+ +```xml + + com.amazonaws + amazon-sqs-java-extended-client-lib + 1.1.0 + +``` + +## Install + +Add the following dependency to your project. + +```xml + + software.amazon.lambda + powertools-sqs + +``` + +The annotation `@LargeMessageHandler` should be used with the handleRequest method of a class +which implements `com.amazonaws.services.lambda.runtime.RequestHandler` with +`com.amazonaws.services.lambda.runtime.events.SQSEvent` as the first parameter. + +```java +public class SqsMessageHandler implements RequestHandler { + + @Override + @LargeMessageHandler + public String handleRequest(SQSEvent sqsEvent, Context context) { + // process messages + + return "ok"; +} +``` + +`@LargeMessageHandler` creats a default S3 Client `AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient()`. +When the Lambda function is invoked with an event from SQS, each record received +in the SQSEvent will be checked to see if it's body contains a payload which has +been offloaded to S3. If it does then `getObject(bucket, key)` will be called +and the payload retrieved. + +*Note*: Retreiving payloads from S3 will increase the duration of the Lambda function. + +If the request handler method returns then each payload will be deleted from S3 +using `deleteObject(bucket, key)}` + +To disable deletion of payloads setting the following annotation parameter: + +```java +@LargeMessageHandler(deletePayloads=false) +``` \ No newline at end of file diff --git a/docs/gatsby-config.js b/docs/gatsby-config.js index c9aa9459a..0dadcc5b5 100644 --- a/docs/gatsby-config.js +++ b/docs/gatsby-config.js @@ -25,7 +25,10 @@ module.exports = { 'Core utilities': [ 'core/logging', 'core/tracing' - ] + ], + 'Utilities': [ + 'utilities/large_message_handling' + ], }, navConfig: { 'Serverless Best Practices video': { From 3c2f694f34341722929032428015106ef024771a Mon Sep 17 00:00:00 2001 From: msailes Date: Mon, 31 Aug 2020 08:37:14 +0100 Subject: [PATCH 16/21] updated docs page --- .../utilities/large_message_handling.mdx | 67 ----------- .../utilities/sqs_large_message_handling.mdx | 106 ++++++++++++++++++ docs/gatsby-config.js | 2 +- 3 files changed, 107 insertions(+), 68 deletions(-) delete mode 100644 docs/content/utilities/large_message_handling.mdx create mode 100644 docs/content/utilities/sqs_large_message_handling.mdx diff --git a/docs/content/utilities/large_message_handling.mdx b/docs/content/utilities/large_message_handling.mdx deleted file mode 100644 index 9ecc8b9a1..000000000 --- a/docs/content/utilities/large_message_handling.mdx +++ /dev/null @@ -1,67 +0,0 @@ ---- -title: Large Message Handling -description: Utility ---- - - - -The large message handling utility is used to handle SQS messages which have had their payloads -offloaded to S3 due to them being larger than the SQS maximum. - -The utility automatically retrieves and deletes messages which have been offloaded to S3 using the -[amazon-sqs-java-extended-client-lib](https://github.com/awslabs/amazon-sqs-java-extended-client-lib) -client library. - -This utility is compatible with versions 1.1.0+ of amazon-sqs-java-extended-client-lib.

- -```xml - - com.amazonaws - amazon-sqs-java-extended-client-lib - 1.1.0 - -``` - -## Install - -Add the following dependency to your project. - -```xml - - software.amazon.lambda - powertools-sqs - -``` - -The annotation `@LargeMessageHandler` should be used with the handleRequest method of a class -which implements `com.amazonaws.services.lambda.runtime.RequestHandler` with -`com.amazonaws.services.lambda.runtime.events.SQSEvent` as the first parameter. - -```java -public class SqsMessageHandler implements RequestHandler { - - @Override - @LargeMessageHandler - public String handleRequest(SQSEvent sqsEvent, Context context) { - // process messages - - return "ok"; -} -``` - -`@LargeMessageHandler` creats a default S3 Client `AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient()`. -When the Lambda function is invoked with an event from SQS, each record received -in the SQSEvent will be checked to see if it's body contains a payload which has -been offloaded to S3. If it does then `getObject(bucket, key)` will be called -and the payload retrieved. - -*Note*: Retreiving payloads from S3 will increase the duration of the Lambda function. - -If the request handler method returns then each payload will be deleted from S3 -using `deleteObject(bucket, key)}` - -To disable deletion of payloads setting the following annotation parameter: - -```java -@LargeMessageHandler(deletePayloads=false) -``` \ No newline at end of file diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx new file mode 100644 index 000000000..d2aac1308 --- /dev/null +++ b/docs/content/utilities/sqs_large_message_handling.mdx @@ -0,0 +1,106 @@ +--- +title: SQS Large Message Handling +description: Utility +--- + +The large message handling utility handles SQS messages which have had their payloads +offloaded to S3 due to them being larger than the SQS maximum. + +The utility automatically retrieves messages which have been offloaded to S3 using the +[amazon-sqs-java-extended-client-lib](https://github.com/awslabs/amazon-sqs-java-extended-client-lib) +client library. Once the message payloads have been processed successful the +utility can delete the message payloads from S3. + +This utility is compatible with versions *1.1.0+* of amazon-sqs-java-extended-client-lib.

+ +```xml + + com.amazonaws + amazon-sqs-java-extended-client-lib + 1.1.0 + +``` + +## Install + +To install this utility, add the following dependency to your project. + +```xml + + software.amazon.lambda + powertools-sqs + x.x.x + +``` + +And configure the aspectj-maven-plugin to compile-time weave (CTW) the +aws-lambda-powertools-java aspects into your project. You may already have this +plugin in your pom. In that case add the depenedency to the `aspectLibraries` +section. + +```xml + + + ... + + org.codehaus.mojo + aspectj-maven-plugin + 1.11 + + 1.8 + 1.8 + 1.8 + + ... + + software.amazon.lambda + powertools-sqs + + ... + + + + + + compile + + + + + ... + + +``` + +The annotation `@LargeMessageHandler` should be used with the handleRequest method of a class +which implements `com.amazonaws.services.lambda.runtime.RequestHandler` with +`com.amazonaws.services.lambda.runtime.events.SQSEvent` as the first parameter. + +```java +public class SqsMessageHandler implements RequestHandler { + + @Override + @LargeMessageHandler + public String handleRequest(SQSEvent sqsEvent, Context context) { + // process messages + + return "ok"; +} +``` + +`@LargeMessageHandler` creates a default S3 Client `AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient()`. +When the Lambda function is invoked with an event from SQS, each record received +in the SQSEvent will be checked to see if it's body contains a payload which has +been offloaded to S3. If it does then `getObject(bucket, key)` will be called, +and the payload retrieved. + +*Note*: Retrieving payloads from S3 will increase the duration of the Lambda function. + +If the request handler method returns without error then each payload will be +deleted from S3 using `deleteObject(bucket, key)}` + +To disable deletion of payloads setting the following annotation parameter: + +```java +@LargeMessageHandler(deletePayloads=false) +``` \ No newline at end of file diff --git a/docs/gatsby-config.js b/docs/gatsby-config.js index 0dadcc5b5..e588bb5eb 100644 --- a/docs/gatsby-config.js +++ b/docs/gatsby-config.js @@ -27,7 +27,7 @@ module.exports = { 'core/tracing' ], 'Utilities': [ - 'utilities/large_message_handling' + 'utilities/sqs_large_message_handling' ], }, navConfig: { From 5d54bf7628ff5a3ce5948bcca053f22bf4157424 Mon Sep 17 00:00:00 2001 From: msailes Date: Mon, 31 Aug 2020 08:40:48 +0100 Subject: [PATCH 17/21] typo --- docs/content/utilities/sqs_large_message_handling.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx index d2aac1308..606b078f6 100644 --- a/docs/content/utilities/sqs_large_message_handling.mdx +++ b/docs/content/utilities/sqs_large_message_handling.mdx @@ -97,7 +97,7 @@ and the payload retrieved. *Note*: Retrieving payloads from S3 will increase the duration of the Lambda function. If the request handler method returns without error then each payload will be -deleted from S3 using `deleteObject(bucket, key)}` +deleted from S3 using `deleteObject(bucket, key)` To disable deletion of payloads setting the following annotation parameter: From 8c18a75226298fec140719e1ed692fa6ba854a6e Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Mon, 31 Aug 2020 12:13:00 +0200 Subject: [PATCH 18/21] Fix example app version for powertools --- example/HelloWorldFunction/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/example/HelloWorldFunction/pom.xml b/example/HelloWorldFunction/pom.xml index efe50ace6..06735af31 100644 --- a/example/HelloWorldFunction/pom.xml +++ b/example/HelloWorldFunction/pom.xml @@ -16,12 +16,12 @@ software.amazon.lambda powertools-tracing - 0.1.0-SNAPSHOT + 0.1.0-beta software.amazon.lambda powertools-logging - 0.1.0-SNAPSHOT + 0.1.0-beta com.amazonaws From 563f43a99e888ec650cad688f6fd5288fe3861ae Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Mon, 31 Aug 2020 12:19:07 +0200 Subject: [PATCH 19/21] Set env var for sdk to use during remote builds --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 35bbce70b..adb4f9b14 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,6 +20,7 @@ jobs: env: OS: ${{ matrix.os }} JAVA: ${{ matrix.java-version }} + AWS_REGION: eu-west-1 steps: - uses: actions/checkout@v2 - name: Setup java From 9d15a2314dfb5744f224349475dffc45338f1c87 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Mon, 31 Aug 2020 12:22:53 +0200 Subject: [PATCH 20/21] Fix parent module version for sqs --- powertools-sqs/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml index daa2eb840..d87d78765 100644 --- a/powertools-sqs/pom.xml +++ b/powertools-sqs/pom.xml @@ -10,7 +10,7 @@ powertools-parent software.amazon.lambda - 0.1.0-SNAPSHOT + 0.1.0-beta AWS Lambda Powertools Java library SQS From 9eab98b2156bd65807e934800283eace1744b790 Mon Sep 17 00:00:00 2001 From: msailes Date: Mon, 31 Aug 2020 15:58:46 +0100 Subject: [PATCH 21/21] documentation improvements. --- docs/content/utilities/sqs_large_message_handling.mdx | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx index 606b078f6..61740a3a7 100644 --- a/docs/content/utilities/sqs_large_message_handling.mdx +++ b/docs/content/utilities/sqs_large_message_handling.mdx @@ -29,7 +29,7 @@ To install this utility, add the following dependency to your project. software.amazon.lambda powertools-sqs - x.x.x + 0.1.0-beta ``` @@ -92,9 +92,8 @@ public class SqsMessageHandler implements RequestHandler { When the Lambda function is invoked with an event from SQS, each record received in the SQSEvent will be checked to see if it's body contains a payload which has been offloaded to S3. If it does then `getObject(bucket, key)` will be called, -and the payload retrieved. - -*Note*: Retrieving payloads from S3 will increase the duration of the Lambda function. +and the payload retrieved. If there is an error during this process then the +function will fail with a `FailedProcessingLargePayloadException` exception. If the request handler method returns without error then each payload will be deleted from S3 using `deleteObject(bucket, key)`