diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx index cb6cee41f..d20ffdcfc 100644 --- a/docs/content/utilities/sqs_large_message_handling.mdx +++ b/docs/content/utilities/sqs_large_message_handling.mdx @@ -104,5 +104,50 @@ To disable deletion of payloads setting the following annotation parameter: ```java @LargeMessageHandler(deletePayloads=false) public class SqsMessageHandler implements RequestHandler { + +} +``` + +## Utility + +If you want to avoid using annotation and have control over error that can happen during payload enrichment. + +`PowertoolsSqs.enrichedMessageFromS3()` provides you access with list of `SQSMessage` object enriched from S3 payload. +Original `SQSEvent` object is never mutated. You can also control if the S3 payload should be deleted after successful +processing. You can enrich messages from S3 with below code: + +```java +public class SqsMessageHandler implements RequestHandler { + + @Override + public String handleRequest(SQSEvent sqsEvent, Context context) { + + Map sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> { + // Some business logic + Map someBusinessLogic = new HashMap<>(); + someBusinessLogic.put("Message", sqsMessages.get(0).getBody()); + return someBusinessLogic; + }); + + // Do not delete payload after processing. + Map sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, false, sqsMessages -> { + // Some business logic + Map someBusinessLogic = new HashMap<>(); + someBusinessLogic.put("Message", sqsMessages.get(0).getBody()); + return someBusinessLogic; + }); + + // Better control over exception during enrichment + try { + // Do not delete payload after processing. + PowertoolsSqs.enrichedMessageFromS3(sqsEvent, false, sqsMessages -> { + // Some business logic + }); + } catch (FailedProcessingLargePayloadException e) { + // handle any exception. + } + + return "ok"; + } } ``` \ No newline at end of file diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java new file mode 100644 index 000000000..b50ddb538 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package software.amazon.lambda.powertools.sqs; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect; +import software.amazon.payloadoffloading.PayloadS3Pointer; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.processMessages; + +/** + * A class of helper functions to add additional functionality to LargeMessageHandler. + *

+ * {@see PowertoolsLogging} + */ +public class PowertoolsSqs { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + /** + * This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation. + * Gives you access to enriched messages from S3 in the SQS event produced via extended client lib. + * If all the large S3 payload are successfully retrieved, it will delete them from S3 post success. + * + * @param sqsEvent Event received from SQS Extended client library + * @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed. + * @return Return value from the function. + */ + public static R enrichedMessageFromS3(final SQSEvent sqsEvent, + final Function, R> messageFunction) { + return enrichedMessageFromS3(sqsEvent, true, messageFunction); + } + + /** + * This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation. + * Gives you access to enriched messages from S3 in the SQS event produced via extended client lib. + * if all the large S3 payload are successfully retrieved, Control if it will delete payload from S3 post success. + * + * @param sqsEvent Event received from SQS Extended client library + * @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed. + * @return Return value from the function. + */ + public static R enrichedMessageFromS3(final SQSEvent sqsEvent, + final boolean deleteS3Payload, + final Function, R> messageFunction) { + + List sqsMessages = sqsEvent.getRecords().stream() + .map(PowertoolsSqs::clonedMessage) + .collect(Collectors.toList()); + + List s3Pointers = processMessages(sqsMessages); + + R returnValue = messageFunction.apply(sqsMessages); + + if (deleteS3Payload) { + s3Pointers.forEach(SqsMessageAspect::deleteMessage); + } + + return returnValue; + } + + private static SQSMessage clonedMessage(SQSMessage sqsMessage) { + try { + return objectMapper + .readValue(objectMapper.writeValueAsString(sqsMessage), SQSMessage.class); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java index 508a3ca77..68be09adb 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java @@ -59,9 +59,13 @@ && placedOnSqsEventRequestHandler(pjp)) { } private List rewriteMessages(SQSEvent sqsEvent) { - List s3Pointers = new ArrayList<>(); + List records = sqsEvent.getRecords(); + return processMessages(records); + } - for (SQSMessage sqsMessage : sqsEvent.getRecords()) { + public static List processMessages(final List records) { + List s3Pointers = new ArrayList<>(); + for (SQSMessage sqsMessage : records) { if (isBodyLargeMessagePointer(sqsMessage.getBody())) { PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody()); @@ -79,11 +83,11 @@ private List rewriteMessages(SQSEvent sqsEvent) { return s3Pointers; } - private boolean isBodyLargeMessagePointer(String record) { + private static boolean isBodyLargeMessagePointer(String record) { return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\""); } - private String readStringFromS3Object(S3Object object) { + private static String readStringFromS3Object(S3Object object) { try (S3ObjectInputStream is = object.getObjectContent()) { return IOUtils.toString(is); } catch (IOException e) { @@ -100,7 +104,15 @@ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) { }); } - private R callS3Gracefully(final PayloadS3Pointer pointer, + public static void deleteMessage(PayloadS3Pointer s3Pointer) { + callS3Gracefully(s3Pointer, pointer -> { + amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key()); + LOG.info("Message deleted from S3: " + s3Pointer.toJson()); + return null; + }); + } + + private static R callS3Gracefully(final PayloadS3Pointer pointer, final Function function) { try { return function.apply(pointer); @@ -119,7 +131,7 @@ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) { && pjp.getArgs()[1] instanceof Context; } - static class FailedProcessingLargePayloadException extends RuntimeException { + public static class FailedProcessingLargePayloadException extends RuntimeException { public FailedProcessingLargePayloadException(String message, Throwable cause) { super(message, cause); } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java new file mode 100644 index 000000000..7528188a9 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java @@ -0,0 +1,167 @@ +package software.amazon.lambda.powertools.sqs; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.util.StringInputStream; +import org.apache.http.client.methods.HttpRequestBase; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mock; +import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +class PowertoolsSqsTest { + + @Mock + private AmazonS3 amazonS3; + private static final String BUCKET_NAME = "ms-extended-sqs-client"; + private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + + @BeforeEach + void setUp() throws IllegalAccessException { + initMocks(this); + writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true); + } + + @Test + public void testLargeMessage() { + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response); + SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"); + + Map sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> { + Map someBusinessLogic = new HashMap<>(); + someBusinessLogic.put("Message", sqsMessages.get(0).getBody()); + return someBusinessLogic; + }); + + assertThat(sqsMessage) + .hasSize(1) + .containsEntry("Message", "A big message"); + + verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testLargeMessageDeleteFromS3Toggle(boolean deleteS3Payload) { + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response); + SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"); + + Map sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, deleteS3Payload, sqsMessages -> { + Map someBusinessLogic = new HashMap<>(); + someBusinessLogic.put("Message", sqsMessages.get(0).getBody()); + return someBusinessLogic; + }); + + assertThat(sqsMessage) + .hasSize(1) + .containsEntry("Message", "A big message"); + if (deleteS3Payload) { + verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY); + } else { + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + } + + @Test + public void shouldNotProcessSmallMessageBody() { + S3Object s3Response = new S3Object(); + s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes())); + + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response); + SQSEvent sqsEvent = messageWithBody("This is small message"); + + Map sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> { + Map someBusinessLogic = new HashMap<>(); + someBusinessLogic.put("Message", sqsMessages.get(0).getBody()); + return someBusinessLogic; + }); + + assertThat(sqsMessage) + .containsEntry("Message", "This is small message"); + + verifyNoInteractions(amazonS3); + } + + @ParameterizedTest + @MethodSource("exception") + public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) { + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenThrow(exception); + + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; + SQSEvent sqsEvent = messageWithBody(messageBody); + + assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class) + .isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody())) + .withCause(exception); + + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + @Test + public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException { + S3Object s3Response = new S3Object(); + + s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") { + @Override + public void close() throws IOException { + throw new IOException("Failed"); + } + }, mock(HttpRequestBase.class))); + + when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response); + + String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; + SQSEvent sqsEvent = messageWithBody(messageBody); + + assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class) + .isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody())) + .withCauseInstanceOf(IOException.class); + + verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY); + } + + private static Stream exception() { + return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")), + Arguments.of(new SdkClientException("Client Exception"))); + } + + private SQSEvent messageWithBody(String messageBody) { + SQSMessage sqsMessage = new SQSMessage(); + sqsMessage.setBody(messageBody); + SQSEvent sqsEvent = new SQSEvent(); + sqsEvent.setRecords(singletonList(sqsMessage)); + return sqsEvent; + } +} \ No newline at end of file