Skip to content

feat: Utility without annotation #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/content/utilities/sqs_large_message_handling.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,50 @@ To disable deletion of payloads setting the following annotation parameter:
```java
@LargeMessageHandler(deletePayloads=false)
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {

}
```

## Utility

If you want to avoid using annotation and have control over error that can happen during payload enrichment.

`PowertoolsSqs.enrichedMessageFromS3()` provides you access with list of `SQSMessage` object enriched from S3 payload.
Original `SQSEvent` object is never mutated. You can also control if the S3 payload should be deleted after successful
processing. You can enrich messages from S3 with below code:

```java
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {

@Override
public String handleRequest(SQSEvent sqsEvent, Context context) {

Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
// Some business logic
Map<String, String> someBusinessLogic = new HashMap<>();
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
return someBusinessLogic;
});

// Do not delete payload after processing.
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, false, sqsMessages -> {
// Some business logic
Map<String, String> someBusinessLogic = new HashMap<>();
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
return someBusinessLogic;
});

// Better control over exception during enrichment
try {
// Do not delete payload after processing.
PowertoolsSqs.enrichedMessageFromS3(sqsEvent, false, sqsMessages -> {
// Some business logic
});
} catch (FailedProcessingLargePayloadException e) {
// handle any exception.
}

return "ok";
}
}
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package software.amazon.lambda.powertools.sqs;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
import software.amazon.payloadoffloading.PayloadS3Pointer;

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.processMessages;

/**
* A class of helper functions to add additional functionality to LargeMessageHandler.
* <p>
* {@see PowertoolsLogging}
*/
public class PowertoolsSqs {

private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation.
* Gives you access to enriched messages from S3 in the SQS event produced via extended client lib.
* If all the large S3 payload are successfully retrieved, it will delete them from S3 post success.
*
* @param sqsEvent Event received from SQS Extended client library
* @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed.
* @return Return value from the function.
*/
public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
final Function<List<SQSMessage>, R> messageFunction) {
return enrichedMessageFromS3(sqsEvent, true, messageFunction);
}

/**
* This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation.
* Gives you access to enriched messages from S3 in the SQS event produced via extended client lib.
* if all the large S3 payload are successfully retrieved, Control if it will delete payload from S3 post success.
*
* @param sqsEvent Event received from SQS Extended client library
* @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed.
* @return Return value from the function.
*/
public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
final boolean deleteS3Payload,
final Function<List<SQSMessage>, R> messageFunction) {

List<SQSMessage> sqsMessages = sqsEvent.getRecords().stream()
.map(PowertoolsSqs::clonedMessage)
.collect(Collectors.toList());

List<PayloadS3Pointer> s3Pointers = processMessages(sqsMessages);

R returnValue = messageFunction.apply(sqsMessages);

if (deleteS3Payload) {
s3Pointers.forEach(SqsMessageAspect::deleteMessage);
}

return returnValue;
}

private static SQSMessage clonedMessage(SQSMessage sqsMessage) {
try {
return objectMapper
.readValue(objectMapper.writeValueAsString(sqsMessage), SQSMessage.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ && placedOnSqsEventRequestHandler(pjp)) {
}

private List<PayloadS3Pointer> rewriteMessages(SQSEvent sqsEvent) {
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
List<SQSMessage> records = sqsEvent.getRecords();
return processMessages(records);
}

for (SQSMessage sqsMessage : sqsEvent.getRecords()) {
public static List<PayloadS3Pointer> processMessages(final List<SQSMessage> records) {
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
for (SQSMessage sqsMessage : records) {
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());

Expand All @@ -79,11 +83,11 @@ private List<PayloadS3Pointer> rewriteMessages(SQSEvent sqsEvent) {
return s3Pointers;
}

private boolean isBodyLargeMessagePointer(String record) {
private static boolean isBodyLargeMessagePointer(String record) {
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
}

private String readStringFromS3Object(S3Object object) {
private static String readStringFromS3Object(S3Object object) {
try (S3ObjectInputStream is = object.getObjectContent()) {
return IOUtils.toString(is);
} catch (IOException e) {
Expand All @@ -100,7 +104,15 @@ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
});
}

private <R> R callS3Gracefully(final PayloadS3Pointer pointer,
public static void deleteMessage(PayloadS3Pointer s3Pointer) {
callS3Gracefully(s3Pointer, pointer -> {
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
return null;
});
}

private static <R> R callS3Gracefully(final PayloadS3Pointer pointer,
final Function<PayloadS3Pointer, R> function) {
try {
return function.apply(pointer);
Expand All @@ -119,7 +131,7 @@ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
&& pjp.getArgs()[1] instanceof Context;
}

static class FailedProcessingLargePayloadException extends RuntimeException {
public static class FailedProcessingLargePayloadException extends RuntimeException {
public FailedProcessingLargePayloadException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package software.amazon.lambda.powertools.sqs;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.util.StringInputStream;
import org.apache.http.client.methods.HttpRequestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

class PowertoolsSqsTest {

@Mock
private AmazonS3 amazonS3;
private static final String BUCKET_NAME = "ms-extended-sqs-client";
private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf";

@BeforeEach
void setUp() throws IllegalAccessException {
initMocks(this);
writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true);
}

@Test
public void testLargeMessage() {
S3Object s3Response = new S3Object();
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));

when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");

Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
Map<String, String> someBusinessLogic = new HashMap<>();
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
return someBusinessLogic;
});

assertThat(sqsMessage)
.hasSize(1)
.containsEntry("Message", "A big message");

verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLargeMessageDeleteFromS3Toggle(boolean deleteS3Payload) {
S3Object s3Response = new S3Object();
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));

when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");

Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, deleteS3Payload, sqsMessages -> {
Map<String, String> someBusinessLogic = new HashMap<>();
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
return someBusinessLogic;
});

assertThat(sqsMessage)
.hasSize(1)
.containsEntry("Message", "A big message");
if (deleteS3Payload) {
verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
} else {
verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
}
}

@Test
public void shouldNotProcessSmallMessageBody() {
S3Object s3Response = new S3Object();
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));

when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
SQSEvent sqsEvent = messageWithBody("This is small message");

Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
Map<String, String> someBusinessLogic = new HashMap<>();
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
return someBusinessLogic;
});

assertThat(sqsMessage)
.containsEntry("Message", "This is small message");

verifyNoInteractions(amazonS3);
}

@ParameterizedTest
@MethodSource("exception")
public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) {
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenThrow(exception);

String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
SQSEvent sqsEvent = messageWithBody(messageBody);

assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class)
.isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody()))
.withCause(exception);

verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
}

@Test
public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException {
S3Object s3Response = new S3Object();

s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") {
@Override
public void close() throws IOException {
throw new IOException("Failed");
}
}, mock(HttpRequestBase.class)));

when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);

String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
SQSEvent sqsEvent = messageWithBody(messageBody);

assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class)
.isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody()))
.withCauseInstanceOf(IOException.class);

verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
}

private static Stream<Arguments> exception() {
return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")),
Arguments.of(new SdkClientException("Client Exception")));
}

private SQSEvent messageWithBody(String messageBody) {
SQSMessage sqsMessage = new SQSMessage();
sqsMessage.setBody(messageBody);
SQSEvent sqsEvent = new SQSEvent();
sqsEvent.setRecords(singletonList(sqsMessage));
return sqsEvent;
}
}