Skip to content

Commit dd3a715

Browse files
author
Pankaj Agrawal
committed
Ability to control payload deletion from S3
1 parent a9d9b79 commit dd3a715

File tree

2 files changed

+46
-3
lines changed

2 files changed

+46
-3
lines changed

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package software.amazon.lambda.powertools.sqs;
1515

1616
import java.util.List;
17-
import java.util.function.Consumer;
1817
import java.util.function.Function;
1918
import java.util.stream.Collectors;
2019

@@ -39,13 +38,29 @@ public class PowertoolsSqs {
3938
/**
4039
* This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation.
4140
* Gives you access to enriched messages from S3 in the SQS event produced via extended client lib.
41+
* If all the large S3 payload are successfully retrieved, it will delete them from S3 post success.
4242
*
4343
* @param sqsEvent Event received from SQS Extended client library
4444
* @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed.
45-
* @return Return value from the function.
45+
* @return Return value from the function.
4646
*/
4747
public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
4848
final Function<List<SQSMessage>, R> messageFunction) {
49+
return enrichedMessageFromS3(sqsEvent, true, messageFunction);
50+
}
51+
52+
/**
53+
* This is a utility method when you want to avoid using {@code LargeMessageHandler} annotation.
54+
* Gives you access to enriched messages from S3 in the SQS event produced via extended client lib.
55+
* if all the large S3 payload are successfully retrieved, Control if it will delete payload from S3 post success.
56+
*
57+
* @param sqsEvent Event received from SQS Extended client library
58+
* @param messageFunction Function to execute you business logic which provides access to enriched messages from S3 when needed.
59+
* @return Return value from the function.
60+
*/
61+
public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
62+
final boolean deleteS3Payload,
63+
final Function<List<SQSMessage>, R> messageFunction) {
4964

5065
List<SQSMessage> sqsMessages = sqsEvent.getRecords().stream()
5166
.map(PowertoolsSqs::clonedMessage)
@@ -55,7 +70,9 @@ public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
5570

5671
R returnValue = messageFunction.apply(sqsMessages);
5772

58-
s3Pointers.forEach(SqsMessageAspect::deleteMessage);
73+
if (deleteS3Payload) {
74+
s3Pointers.forEach(SqsMessageAspect::deleteMessage);
75+
}
5976

6077
return returnValue;
6178
}

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.junit.jupiter.params.ParameterizedTest;
2020
import org.junit.jupiter.params.provider.Arguments;
2121
import org.junit.jupiter.params.provider.MethodSource;
22+
import org.junit.jupiter.params.provider.ValueSource;
2223
import org.mockito.Mock;
2324
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
2425

@@ -68,6 +69,31 @@ public void testLargeMessage() {
6869
verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
6970
}
7071

72+
@ParameterizedTest
73+
@ValueSource(booleans = {true, false})
74+
public void testLargeMessageDeleteFromS3Toggle(boolean deleteS3Payload) {
75+
S3Object s3Response = new S3Object();
76+
s3Response.setObjectContent(new ByteArrayInputStream("A big message".getBytes()));
77+
78+
when(amazonS3.getObject(BUCKET_NAME, BUCKET_KEY)).thenReturn(s3Response);
79+
SQSEvent sqsEvent = messageWithBody("[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]");
80+
81+
Map<String, String> sqsMessage = PowertoolsSqs.enrichedMessageFromS3(sqsEvent, deleteS3Payload, sqsMessages -> {
82+
Map<String, String> someBusinessLogic = new HashMap<>();
83+
someBusinessLogic.put("Message", sqsMessages.get(0).getBody());
84+
return someBusinessLogic;
85+
});
86+
87+
assertThat(sqsMessage)
88+
.hasSize(1)
89+
.containsEntry("Message", "A big message");
90+
if (deleteS3Payload) {
91+
verify(amazonS3).deleteObject(BUCKET_NAME, BUCKET_KEY);
92+
} else {
93+
verify(amazonS3, never()).deleteObject(BUCKET_NAME, BUCKET_KEY);
94+
}
95+
}
96+
7197
@Test
7298
public void shouldNotProcessSmallMessageBody() {
7399
S3Object s3Response = new S3Object();

0 commit comments

Comments
 (0)