diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 35bbce70b..adb4f9b14 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,6 +20,7 @@ jobs: env: OS: ${{ matrix.os }} JAVA: ${{ matrix.java-version }} + AWS_REGION: eu-west-1 steps: - uses: actions/checkout@v2 - name: Setup java diff --git a/docs/content/dummy.md b/docs/content/dummy.md deleted file mode 100644 index d53b2cc3b..000000000 --- a/docs/content/dummy.md +++ /dev/null @@ -1,8 +0,0 @@ ---- -title: Dummy title -description: Dummy description ---- - -## Test - -dummy content \ No newline at end of file diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx new file mode 100644 index 000000000..61740a3a7 --- /dev/null +++ b/docs/content/utilities/sqs_large_message_handling.mdx @@ -0,0 +1,105 @@ +--- +title: SQS Large Message Handling +description: Utility +--- + +The large message handling utility handles SQS messages which have had their payloads +offloaded to S3 due to them being larger than the SQS maximum. + +The utility automatically retrieves messages which have been offloaded to S3 using the +[amazon-sqs-java-extended-client-lib](https://github.com/awslabs/amazon-sqs-java-extended-client-lib) +client library. Once the message payloads have been processed successful the +utility can delete the message payloads from S3. + +This utility is compatible with versions *1.1.0+* of amazon-sqs-java-extended-client-lib.

+ +```xml + + com.amazonaws + amazon-sqs-java-extended-client-lib + 1.1.0 + +``` + +## Install + +To install this utility, add the following dependency to your project. + +```xml + + software.amazon.lambda + powertools-sqs + 0.1.0-beta + +``` + +And configure the aspectj-maven-plugin to compile-time weave (CTW) the +aws-lambda-powertools-java aspects into your project. You may already have this +plugin in your pom. In that case add the depenedency to the `aspectLibraries` +section. + +```xml + + + ... + + org.codehaus.mojo + aspectj-maven-plugin + 1.11 + + 1.8 + 1.8 + 1.8 + + ... + + software.amazon.lambda + powertools-sqs + + ... + + + + + + compile + + + + + ... + + +``` + +The annotation `@LargeMessageHandler` should be used with the handleRequest method of a class +which implements `com.amazonaws.services.lambda.runtime.RequestHandler` with +`com.amazonaws.services.lambda.runtime.events.SQSEvent` as the first parameter. + +```java +public class SqsMessageHandler implements RequestHandler { + + @Override + @LargeMessageHandler + public String handleRequest(SQSEvent sqsEvent, Context context) { + // process messages + + return "ok"; +} +``` + +`@LargeMessageHandler` creates a default S3 Client `AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient()`. +When the Lambda function is invoked with an event from SQS, each record received +in the SQSEvent will be checked to see if it's body contains a payload which has +been offloaded to S3. If it does then `getObject(bucket, key)` will be called, +and the payload retrieved. If there is an error during this process then the +function will fail with a `FailedProcessingLargePayloadException` exception. + +If the request handler method returns without error then each payload will be +deleted from S3 using `deleteObject(bucket, key)` + +To disable deletion of payloads setting the following annotation parameter: + +```java +@LargeMessageHandler(deletePayloads=false) +``` \ No newline at end of file diff --git a/docs/gatsby-config.js b/docs/gatsby-config.js index c9aa9459a..e588bb5eb 100644 --- a/docs/gatsby-config.js +++ b/docs/gatsby-config.js @@ -25,7 +25,10 @@ module.exports = { 'Core utilities': [ 'core/logging', 'core/tracing' - ] + ], + 'Utilities': [ + 'utilities/sqs_large_message_handling' + ], }, navConfig: { 'Serverless Best Practices video': { diff --git a/example/HelloWorldFunction/pom.xml b/example/HelloWorldFunction/pom.xml index efe50ace6..06735af31 100644 --- a/example/HelloWorldFunction/pom.xml +++ b/example/HelloWorldFunction/pom.xml @@ -16,12 +16,12 @@ software.amazon.lambda powertools-tracing - 0.1.0-SNAPSHOT + 0.1.0-beta software.amazon.lambda powertools-logging - 0.1.0-SNAPSHOT + 0.1.0-beta com.amazonaws diff --git a/pom.xml b/pom.xml index 77adc6e28..1ecb165ca 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ powertools-core powertools-logging powertools-tracing + powertools-sqs @@ -50,9 +51,12 @@ 2.13.3 2.11.0 1.9.6 + 2.14.4 2.4.0 + 1.0.0 UTF-8 1.2.1 + 3.1.0 3.8.1 1.12.1 2.22.2 @@ -62,6 +66,7 @@ 2.10.3 2.4 1.6 + 5.6.2 @@ -83,6 +88,16 @@ aws-lambda-java-core ${lambda.core.version} + + com.amazonaws + aws-lambda-java-events + ${lambda.events.version} + + + software.amazon.payloadoffloading + payloadoffloading-common + ${payloadoffloading-common.version} + org.aspectj aspectjrt @@ -133,13 +148,19 @@ org.junit.jupiter junit-jupiter-api - 5.6.2 + ${junit-jupiter.version} test org.junit.jupiter junit-jupiter-engine - 5.6.2 + ${junit-jupiter.version} + test + + + org.junit.jupiter + junit-jupiter-params + ${junit-jupiter.version} test diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml new file mode 100644 index 000000000..d87d78765 --- /dev/null +++ b/powertools-sqs/pom.xml @@ -0,0 +1,104 @@ + + + 4.0.0 + + powertools-sqs + jar + + + powertools-parent + software.amazon.lambda + 0.1.0-beta + + + AWS Lambda Powertools Java library SQS + + A suite of utilities for AWS Lambda Functions that makes tracing with AWS X-Ray, structured logging and creating custom metrics asynchronously easier. + + https://aws.amazon.com/lambda/ + + GitHub Issues + https://github.com/awslabs/aws-lambda-powertools-java/issues + + + https://github.com/awslabs/aws-lambda-powertools-java.git + + + + AWS Lambda Powertools team + Amazon Web Services + https://aws.amazon.com/ + + + + + + ossrh + https://aws.oss.sonatype.org/content/repositories/snapshots + + + + + + software.amazon.lambda + powertools-core + + + com.amazonaws + aws-lambda-java-core + + + com.amazonaws + aws-lambda-java-events + + + software.amazon.payloadoffloading + payloadoffloading-common + + + + org.aspectj + aspectjrt + + + + + org.junit.jupiter + junit-jupiter-api + test + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + org.apache.commons + commons-lang3 + test + + + org.mockito + mockito-core + test + + + org.aspectj + aspectjweaver + test + + + org.assertj + assertj-core + test + + + + \ No newline at end of file diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java new file mode 100644 index 000000000..9f67d2906 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/LargeMessageHandler.java @@ -0,0 +1,68 @@ +package software.amazon.lambda.powertools.sqs; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * {@code LargeMessageHandler} is used to signal that the annotated method + * should be extended to handle large SQS messages which have been offloaded + * to S3 + * + *

{@code LargeMessageHandler} automatically retrieves and deletes messages + * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} + * client library.

+ * + *

This version of the {@code LargeMessageHandler} is compatible with version + * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.

+ * + *
+ * <dependency>
+ *   <groupId>com.amazonaws</groupId>
+ *   <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
+ *   <version>1.1.0</version>
+ * </dependency>
+ * 
+ * + *

{@code LargeMessageHandler} should be used with the handleRequest method of a class + * which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with + * {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.

+ * + *
+ * public class SqsMessageHandler implements RequestHandler {
+ *
+ *    {@literal @}Override
+ *    {@literal @}LargeMessageHandler
+ *     public String handleRequest(SQSEvent sqsEvent, Context context) {
+ *
+ *         // process messages
+ *
+ *         return "ok";
+ *     }
+ *
+ *     ...
+ * 
+ * + *

Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();} + * each record received in the SQSEvent {@code LargeMessageHandler} will checked + * to see if it's body contains a payload which has been offloaded to S3. If it + * does then {@code getObject(bucket, key)} will be called and the payload + * retrieved.

+ * + *

Note: Retreiving payloads from S3 will increase the duration of the + * Lambda function.

+ * + *

If the request handler method returns then each payload will be deleted + * from S3 using {@code deleteObject(bucket, key)}

+ * + *

To disable deletion of payloads setting the following annotation parameter + * {@code @LargeMessageHandler(deletePayloads=false)}

+ * + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface LargeMessageHandler { + + boolean deletePayloads() default true; +} diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java new file mode 100644 index 000000000..508a3ca77 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -0,0 +1,127 @@ +package software.amazon.lambda.powertools.sqs.internal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.util.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; +import software.amazon.payloadoffloading.PayloadS3Pointer; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static java.lang.String.format; +import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; + +@Aspect +public class SqsMessageAspect { + + private static final Log LOG = LogFactory.getLog(SqsMessageAspect.class); + private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient(); + + @SuppressWarnings({"EmptyMethod"}) + @Pointcut("@annotation(largeMessageHandler)") + public void callAt(LargeMessageHandler largeMessageHandler) { + } + + @Around(value = "callAt(largeMessageHandler) && execution(@LargeMessageHandler * *.*(..))", argNames = "pjp,largeMessageHandler") + public Object around(ProceedingJoinPoint pjp, + LargeMessageHandler largeMessageHandler) throws Throwable { + Object[] proceedArgs = pjp.getArgs(); + + if (isHandlerMethod(pjp) + && placedOnSqsEventRequestHandler(pjp)) { + List pointersToDelete = rewriteMessages((SQSEvent) proceedArgs[0]); + + Object proceed = pjp.proceed(proceedArgs); + + if (largeMessageHandler.deletePayloads()) { + pointersToDelete.forEach(this::deleteMessageFromS3); + } + return proceed; + } + + return pjp.proceed(proceedArgs); + } + + private List rewriteMessages(SQSEvent sqsEvent) { + List s3Pointers = new ArrayList<>(); + + for (SQSMessage sqsMessage : sqsEvent.getRecords()) { + if (isBodyLargeMessagePointer(sqsMessage.getBody())) { + PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody()); + + S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> { + S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key()); + LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key()); + return object; + }); + + sqsMessage.setBody(readStringFromS3Object(s3Object)); + s3Pointers.add(s3Pointer); + } + } + + return s3Pointers; + } + + private boolean isBodyLargeMessagePointer(String record) { + return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\""); + } + + private String readStringFromS3Object(S3Object object) { + try (S3ObjectInputStream is = object.getObjectContent()) { + return IOUtils.toString(is); + } catch (IOException e) { + LOG.error("Error converting S3 object to String", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e); + } + } + + private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { + callS3Gracefully(s3Pointer, pointer -> { + amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key()); + LOG.info("Message deleted from S3: " + s3Pointer.toJson()); + return null; + }); + } + + private R callS3Gracefully(final PayloadS3Pointer pointer, + final Function function) { + try { + return function.apply(pointer); + } catch (AmazonServiceException e) { + LOG.error("A service exception", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e); + } catch (SdkClientException e) { + LOG.error("Some sort of client exception", e); + throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e); + } + } + + public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) { + return pjp.getArgs().length == 2 + && pjp.getArgs()[0] instanceof SQSEvent + && pjp.getArgs()[1] instanceof Context; + } + + static class FailedProcessingLargePayloadException extends RuntimeException { + public FailedProcessingLargePayloadException(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java new file mode 100644 index 000000000..0bf8260a4 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java @@ -0,0 +1,16 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; + +public class LambdaHandlerApiGateway implements RequestHandler { + + @Override + @LargeMessageHandler + public String handleRequest(APIGatewayProxyRequestEvent sqsEvent, Context context) { + return sqsEvent.getBody(); + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java new file mode 100644 index 000000000..b3d7d4af7 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandler.java @@ -0,0 +1,15 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; + +public class SqsMessageHandler implements RequestHandler { + + @Override + @LargeMessageHandler + public String handleRequest(SQSEvent sqsEvent, Context context) { + return sqsEvent.getRecords().get(0).getBody(); + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java new file mode 100644 index 000000000..a301a2a65 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsNoDeleteMessageHandler.java @@ -0,0 +1,15 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; + +public class SqsNoDeleteMessageHandler implements RequestHandler { + + @Override + @LargeMessageHandler(deletePayloads = false) + public String handleRequest(SQSEvent sqsEvent, Context context) { + return sqsEvent.getRecords().get(0).getBody(); + } +} \ No newline at end of file diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java new file mode 100644 index 000000000..cafec7eef --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java @@ -0,0 +1,184 @@ +package software.amazon.lambda.powertools.sqs.internal; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.util.StringInputStream; +import org.apache.http.client.methods.HttpRequestBase; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway; +import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler; +import software.amazon.lambda.powertools.sqs.handlers.SqsNoDeleteMessageHandler; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.stream.Stream; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.FailedProcessingLargePayloadException; + +public class SqsMessageAspectTest { + + private RequestHandler requestHandler; + + @Mock + private Context context; + + @Mock + private AmazonS3 amazonS3; + + private static final String BUCKET_NAME = "bucketname"; + private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + + @BeforeEach + void setUp() throws IllegalAccessException { + initMocks(this); + setupContext(); + writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true); + requestHandler = new SqsMessageHandler(); + } + + @Test + public void testLargeMessage() { + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3ObjectWithLargeMessage()); + SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"); + + String response = requestHandler.handleRequest(sqsEvent, context); + + assertThat(response) + .isEqualTo("A big message"); + + verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + @Test + public void shouldNotProcessSmallMessageBody() { + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response); + SQSEvent sqsEvent = messageWithBody("This is small message"); + + String response = requestHandler.handleRequest(sqsEvent, context); + + assertThat(response) + .isEqualTo("This is small message"); + + verifyNoInteractions(amazonS3); + } + + @ParameterizedTest + @MethodSource("exception") + public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) { + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenThrow(exception); + + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; + SQSEvent sqsEvent = messageWithBody(messageBody); + + assertThatExceptionOfType(FailedProcessingLargePayloadException.class) + .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context)) + .withCause(exception); + + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + @Test + public void testLargeMessageWithDeletionOff() { + requestHandler = new SqsNoDeleteMessageHandler(); + + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3ObjectWithLargeMessage()); + SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"); + + String response = requestHandler.handleRequest(sqsEvent, context); + + assertThat(response).isEqualTo("A big message"); + + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + + @Test + public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException { + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") { + @Override + public void close() throws IOException { + throw new IOException("Failed"); + } + }, mock(HttpRequestBase.class))); + + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response); + + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; + SQSEvent sqsEvent = messageWithBody(messageBody); + + assertThatExceptionOfType(FailedProcessingLargePayloadException.class) + .isThrownBy(() -> requestHandler.handleRequest(sqsEvent, context)) + .withCauseInstanceOf(IOException.class); + + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + @Test + public void shouldNotDoAnyProcessingWhenNotSqsEvent() { + LambdaHandlerApiGateway handler = new LambdaHandlerApiGateway(); + + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; + + APIGatewayProxyRequestEvent event = new APIGatewayProxyRequestEvent(); + event.setBody(messageBody); + String response = handler.handleRequest(event, context); + + assertThat(response) + .isEqualTo(messageBody); + + verifyNoInteractions(amazonS3); + } + + private S3Object s3ObjectWithLargeMessage() { + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + return s3Response; + } + + private static Stream exception() { + return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")), + Arguments.of(new SdkClientException("Client Exception"))); + } + + private SQSEvent messageWithBody(String messageBody) { + SQSMessage sqsMessage = new SQSMessage(); + sqsMessage.setBody(messageBody); + SQSEvent sqsEvent = new SQSEvent(); + sqsEvent.setRecords(singletonList(sqsMessage)); + return sqsEvent; + } + + private void setupContext() { + when(context.getFunctionName()).thenReturn("testFunction"); + when(context.getInvokedFunctionArn()).thenReturn("testArn"); + when(context.getFunctionVersion()).thenReturn("1"); + when(context.getMemoryLimitInMB()).thenReturn(10); + } +} \ No newline at end of file