Skip to content

Commit a9d9b79

Browse files
author
Pankaj Agrawal
committed
Helper method to work without annotation
1 parent 7b17f61 commit a9d9b79

File tree

3 files changed

+230
-6
lines changed

3 files changed

+230
-6
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2020 Amazon.com, Inc. or its affiliates.
3+
* Licensed under the Apache License, Version 2.0 (the
4+
* "License"); you may not use this file except in compliance
5+
* with the License. You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
* limitations under the License.
12+
*
13+
*/
14+
package software.amazon.lambda.powertools.sqs;
15+
16+
import java.util.List;
17+
import java.util.function.Consumer;
18+
import java.util.function.Function;
19+
import java.util.stream.Collectors;
20+
21+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
22+
import com.fasterxml.jackson.core.JsonProcessingException;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
25+
import software.amazon.payloadoffloading.PayloadS3Pointer;
26+
27+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
28+
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.processMessages;
29+
30+
/**
31+
* A class of helper functions to add additional functionality to LargeMessageHandler.
32+
* <p>
33+
* {@see PowertoolsLogging}
34+
*/
35+
public class PowertoolsSqs {
36+
37+
private static final ObjectMapper objectMapper = new ObjectMapper();
38+
39+
/**
40+
* This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation.
41+
* Gives you access to enriched messages from S3 in the SQS event produced via extended client lib.
42+
*
43+
* @param sqsEvent Event received from SQS Extended client library
44+
* @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed.
45+
* @return Return value from the function.
46+
*/
47+
public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
48+
final Function<List<SQSMessage>, R> messageFunction) {
49+
50+
List<SQSMessage> sqsMessages = sqsEvent.getRecords().stream()
51+
.map(PowertoolsSqs::clonedMessage)
52+
.collect(Collectors.toList());
53+
54+
List<PayloadS3Pointer> s3Pointers = processMessages(sqsMessages);
55+
56+
R returnValue = messageFunction.apply(sqsMessages);
57+
58+
s3Pointers.forEach(SqsMessageAspect::deleteMessage);
59+
60+
return returnValue;
61+
}
62+
63+
private static SQSMessage clonedMessage(SQSMessage sqsMessage) {
64+
try {
65+
return objectMapper
66+
.readValue(objectMapper.writeValueAsString(sqsMessage), SQSMessage.class);
67+
} catch (JsonProcessingException e) {
68+
throw new RuntimeException(e);
69+
}
70+
}
71+
}

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,13 @@ && placedOnSqsEventRequestHandler(pjp)) {
5959
}
6060

6161
private List<PayloadS3Pointer> rewriteMessages(SQSEvent sqsEvent) {
62-
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
62+
List<SQSMessage> records = sqsEvent.getRecords();
63+
return processMessages(records);
64+
}
6365

64-
for (SQSMessage sqsMessage : sqsEvent.getRecords()) {
66+
public static List<PayloadS3Pointer> processMessages(final List<SQSMessage> records) {
67+
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
68+
for (SQSMessage sqsMessage : records) {
6569
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
6670
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());
6771

@@ -79,11 +83,11 @@ private List<PayloadS3Pointer> rewriteMessages(SQSEvent sqsEvent) {
7983
return s3Pointers;
8084
}
8185

82-
private boolean isBodyLargeMessagePointer(String record) {
86+
private static boolean isBodyLargeMessagePointer(String record) {
8387
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
8488
}
8589

86-
private String readStringFromS3Object(S3Object object) {
90+
private static String readStringFromS3Object(S3Object object) {
8791
try (S3ObjectInputStream is = object.getObjectContent()) {
8892
return IOUtils.toString(is);
8993
} catch (IOException e) {
@@ -100,7 +104,15 @@ private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
100104
});
101105
}
102106

103-
private <R> R callS3Gracefully(final PayloadS3Pointer pointer,
107+
public static void deleteMessage(PayloadS3Pointer s3Pointer) {
108+
callS3Gracefully(s3Pointer, pointer -> {
109+
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
110+
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
111+
return null;
112+
});
113+
}
114+
115+
private static <R> R callS3Gracefully(final PayloadS3Pointer pointer,
104116
final Function<PayloadS3Pointer, R> function) {
105117
try {
106118
return function.apply(pointer);
@@ -119,7 +131,7 @@ public static boolean placedOnSqsEventRequestHandler(ProceedingJoinPoint pjp) {
119131
&& pjp.getArgs()[1] instanceof Context;
120132
}
121133

122-
static class FailedProcessingLargePayloadException extends RuntimeException {
134+
public static class FailedProcessingLargePayloadException extends RuntimeException {
123135
public FailedProcessingLargePayloadException(String message, Throwable cause) {
124136
super(message, cause);
125137
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package software.amazon.lambda.powertools.sqs;
2+
3+
import java.io.ByteArrayInputStream;
4+
import java.io.IOException;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import java.util.stream.Stream;
8+
9+
import com.amazonaws.AmazonServiceException;
10+
import com.amazonaws.SdkClientException;
11+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
12+
import com.amazonaws.services.s3.AmazonS3;
13+
import com.amazonaws.services.s3.model.S3Object;
14+
import com.amazonaws.services.s3.model.S3ObjectInputStream;
15+
import com.amazonaws.util.StringInputStream;
16+
import org.apache.http.client.methods.HttpRequestBase;
17+
import org.junit.jupiter.api.BeforeEach;
18+
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.params.ParameterizedTest;
20+
import org.junit.jupiter.params.provider.Arguments;
21+
import org.junit.jupiter.params.provider.MethodSource;
22+
import org.mockito.Mock;
23+
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
24+
25+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
26+
import static java.util.Collections.singletonList;
27+
import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField;
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.never;
32+
import static org.mockito.Mockito.verify;
33+
import static org.mockito.Mockito.verifyNoInteractions;
34+
import static org.mockito.Mockito.when;
35+
import static org.mockito.MockitoAnnotations.initMocks;
36+
37+
class PowertoolsSqsTest {
38+
39+
@Mock
40+
private AmazonS3 amazonS3;
41+
private static final String BUCKET_NAME = "ms-extended-sqs-client";
42+
private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf";
43+
44+
@BeforeEach
45+
void setUp() throws IllegalAccessException {
46+
initMocks(this);
47+
writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true);
48+
}
49+
50+
@Test
51+
public void testLargeMessage() {
52+
S3Object s3Response = new S3Object();
53+
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
54+
55+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
56+
SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");
57+
58+
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
59+
Map<String, String> someBusinessLogic = new HashMap<>();
60+
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
61+
return someBusinessLogic;
62+
});
63+
64+
assertThat(sqsMessage)
65+
.hasSize(1)
66+
.containsEntry("Message", "A big message");
67+
68+
verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
69+
}
70+
71+
@Test
72+
public void shouldNotProcessSmallMessageBody() {
73+
S3Object s3Response = new S3Object();
74+
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
75+
76+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
77+
SQSEvent sqsEvent = messageWithBody("This is small message");
78+
79+
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> {
80+
Map<String, String> someBusinessLogic = new HashMap<>();
81+
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
82+
return someBusinessLogic;
83+
});
84+
85+
assertThat(sqsMessage)
86+
.containsEntry("Message", "This is small message");
87+
88+
verifyNoInteractions(amazonS3);
89+
}
90+
91+
@ParameterizedTest
92+
@MethodSource("exception")
93+
public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exception) {
94+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenThrow(exception);
95+
96+
String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
97+
SQSEvent sqsEvent = messageWithBody(messageBody);
98+
99+
assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class)
100+
.isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody()))
101+
.withCause(exception);
102+
103+
verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
104+
}
105+
106+
@Test
107+
public void shouldFailEntireBatchIfFailedProcessingDownloadMessageFromS3() throws IOException {
108+
S3Object s3Response = new S3Object();
109+
110+
s3Response.setObjectContent(new S3ObjectInputStream(new StringInputStream("test") {
111+
@Override
112+
public void close() throws IOException {
113+
throw new IOException("Failed");
114+
}
115+
}, mock(HttpRequestBase.class)));
116+
117+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
118+
119+
String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
120+
SQSEvent sqsEvent = messageWithBody(messageBody);
121+
122+
assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class)
123+
.isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody()))
124+
.withCauseInstanceOf(IOException.class);
125+
126+
verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
127+
}
128+
129+
private static Stream<Arguments> exception() {
130+
return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")),
131+
Arguments.of(new SdkClientException("Client Exception")));
132+
}
133+
134+
private SQSEvent messageWithBody(String messageBody) {
135+
SQSMessage sqsMessage = new SQSMessage();
136+
sqsMessage.setBody(messageBody);
137+
SQSEvent sqsEvent = new SQSEvent();
138+
sqsEvent.setRecords(singletonList(sqsMessage));
139+
return sqsEvent;
140+
}
141+
}

0 commit comments

Comments
 (0)