Skip to content

Commit 1e317ae

Browse files
authored
Add IgnorepayloadNotFound flag support for SQSExtendedAsync Client (#167)
* Fix unit tests and build * Add IgnorePayload flag support for SQSExtendedAsync client
1 parent b458ea0 commit 1e317ae

File tree

4 files changed

+101
-4
lines changed

4 files changed

+101
-4
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@
7979
<version>5.12.0</version>
8080
<scope>test</scope>
8181
</dependency>
82+
<dependency>
83+
<groupId>net.bytebuddy</groupId>
84+
<artifactId>byte-buddy</artifactId>
85+
<version>LATEST</version>
86+
</dependency>
8287
</dependencies>
8388
<build>
8489
<pluginManagement>

src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import java.util.HashMap;
1414
import java.util.List;
1515
import java.util.Map;
16+
import java.util.Objects;
1617
import java.util.Optional;
1718
import java.util.UUID;
1819
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.CompletionException;
1921
import java.util.stream.Collectors;
2022
import org.apache.commons.logging.Log;
2123
import org.apache.commons.logging.LogFactory;
@@ -193,6 +195,7 @@ public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRe
193195
messageAttributeNames.removeAll(AmazonSQSExtendedClientUtil.RESERVED_ATTRIBUTE_NAMES);
194196
messageAttributeNames.addAll(AmazonSQSExtendedClientUtil.RESERVED_ATTRIBUTE_NAMES);
195197
receiveMessageRequestBuilder.messageAttributeNames(messageAttributeNames);
198+
String queueUrl = receiveMessageRequest.queueUrl();
196199
receiveMessageRequest = receiveMessageRequestBuilder.build();
197200

198201
return super.receiveMessage(receiveMessageRequest)
@@ -222,7 +225,28 @@ public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRe
222225

223226
// Retrieve original payload
224227
modifiedMessageFutures.add(payloadStore.getOriginalPayload(largeMessagePointer)
225-
.thenApply(originalPayload -> {
228+
.handle((originalPayload,throwable) -> {
229+
230+
if(throwable != null)
231+
{
232+
if(clientConfiguration.ignoresPayloadNotFound())
233+
{
234+
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest
235+
.builder()
236+
.queueUrl(queueUrl)
237+
.receiptHandle(message.receiptHandle())
238+
.build();
239+
240+
deleteMessage(deleteMessageRequest).join();
241+
LOG.warn("Message deleted from SQS since payload with pointer could not be found in S3.");
242+
return null;
243+
}
244+
else
245+
{
246+
throw new CompletionException(throwable);
247+
}
248+
}
249+
226250
// Set original payload
227251
messageBuilder.body(originalPayload);
228252

@@ -249,6 +273,7 @@ public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRe
249273
modifiedMessageFutures.toArray(new CompletableFuture[modifiedMessageFutures.size()]))
250274
.thenApply(v -> modifiedMessageFutures.stream()
251275
.map(CompletableFuture::join)
276+
.filter(Objects::nonNull)
252277
.collect(Collectors.toList()));
253278
})
254279
.thenApply(modifiedMessages -> {

src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
import static org.junit.jupiter.api.Assertions.assertNotEquals;
88
import static org.junit.jupiter.api.Assertions.assertNull;
99
import static org.junit.jupiter.api.Assertions.assertTrue;
10+
import static org.junit.jupiter.api.Assertions.fail;
1011
import static org.mockito.ArgumentMatchers.any;
1112
import static org.mockito.ArgumentMatchers.eq;
1213
import static org.mockito.ArgumentMatchers.isA;
14+
import static org.mockito.Mockito.doThrow;
1315
import static org.mockito.Mockito.mock;
1416
import static org.mockito.Mockito.never;
1517
import static org.mockito.Mockito.spy;
@@ -25,6 +27,7 @@
2527
import java.util.Map;
2628
import java.util.UUID;
2729
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.CompletionException;
2831
import java.util.stream.Collectors;
2932
import java.util.stream.IntStream;
3033
import org.junit.jupiter.api.BeforeEach;
@@ -35,14 +38,17 @@
3538
import software.amazon.awssdk.core.ResponseBytes;
3639
import software.amazon.awssdk.core.async.AsyncRequestBody;
3740
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
41+
import software.amazon.awssdk.core.exception.SdkException;
3842
import software.amazon.awssdk.services.s3.S3AsyncClient;
3943
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
4044
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
4145
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
4246
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
47+
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
4348
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
4449
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
4550
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
51+
import software.amazon.awssdk.services.sqs.SqsClient;
4652
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
4753
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
4854
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
@@ -640,6 +646,63 @@ private void testReceiveMessage_when_MessageIsLarge(String reservedAttributeName
640646
verify(mockS3, times(1)).getObject(isA(GetObjectRequest.class), isA(AsyncResponseTransformer.class));
641647
}
642648

649+
@Test
650+
public void testReceiveMessage_when_ignorePayloadNotFound_then_messageWithPayloadNotFoundIsDeletedFromSQS() {
651+
ExtendedAsyncClientConfiguration extendedAsyncClientConfiguration = new ExtendedAsyncClientConfiguration()
652+
.withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
653+
.withIgnorePayloadNotFound(true);
654+
SqsAsyncClient sqsAsyncExtended = spy(new AmazonSQSExtendedAsyncClient(mockSqsBackend, extendedAsyncClientConfiguration));
655+
656+
String receiptHandle = "receipt-handle";
657+
Message message = Message.builder()
658+
.messageAttributes(ImmutableMap.of(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, MessageAttributeValue.builder().build()))
659+
.body(new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson())
660+
.receiptHandle(receiptHandle)
661+
.build();
662+
663+
when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(
664+
CompletableFuture.completedFuture(ReceiveMessageResponse.builder().messages(message).build()));
665+
doThrow(NoSuchKeyException.class).when(mockS3).getObject((GetObjectRequest) any(), any(AsyncResponseTransformer.class));
666+
667+
ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().queueUrl(SQS_QUEUE_URL).build();
668+
ReceiveMessageResponse receiveMessageResponse = sqsAsyncExtended.receiveMessage(messageRequest).join();
669+
670+
assertTrue(receiveMessageResponse.messages().isEmpty());
671+
672+
ArgumentCaptor<DeleteMessageRequest> deleteMessageRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class);
673+
verify(mockSqsBackend).deleteMessage(deleteMessageRequestArgumentCaptor.capture());
674+
assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().queueUrl());
675+
assertEquals(receiptHandle, deleteMessageRequestArgumentCaptor.getValue().receiptHandle());
676+
}
677+
678+
@Test
679+
public void testReceiveMessage_when_ignorePayloadNotFoundIsFalse_then_messageWithPayloadNotFoundThrowsException() {
680+
ExtendedAsyncClientConfiguration extendedAsyncClientConfiguration = new ExtendedAsyncClientConfiguration()
681+
.withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
682+
.withIgnorePayloadNotFound(false);
683+
SqsAsyncClient sqsAsyncExtended = spy(new AmazonSQSExtendedAsyncClient(mockSqsBackend, extendedAsyncClientConfiguration));
684+
685+
String receiptHandle = "receipt-handle";
686+
Message message = Message.builder()
687+
.messageAttributes(ImmutableMap.of(SQSExtendedClientConstants.RESERVED_ATTRIBUTE_NAME, MessageAttributeValue.builder().build()))
688+
.body(new PayloadS3Pointer(S3_BUCKET_NAME, "S3Key").toJson())
689+
.receiptHandle(receiptHandle)
690+
.build();
691+
692+
when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(
693+
CompletableFuture.completedFuture(ReceiveMessageResponse.builder().messages(message).build()));
694+
doThrow(NoSuchKeyException.class).when(mockS3).getObject((GetObjectRequest) any(), any(AsyncResponseTransformer.class));
695+
696+
ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().build();
697+
try {
698+
sqsAsyncExtended.receiveMessage(messageRequest).join();
699+
fail("Expected exception after receiving NoSuchKeyException from S3 was not thrown.");
700+
} catch (CompletionException e) {
701+
assertEquals(NoSuchKeyException.class.getName(), e.getCause().getClass().getName());
702+
verify(mockSqsBackend, never()).deleteMessage(any(DeleteMessageRequest.class));
703+
}
704+
}
705+
643706
private DeleteMessageBatchRequest generateLargeDeleteBatchRequest(List<String> originalReceiptHandles) {
644707
List<DeleteMessageBatchRequestEntry> deleteEntries = IntStream.range(0, originalReceiptHandles.size())
645708
.mapToObj(i -> DeleteMessageBatchRequestEntry.builder()

src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,11 @@
6767
import static org.junit.jupiter.api.Assertions.assertNotEquals;
6868
import static org.junit.jupiter.api.Assertions.assertNull;
6969
import static org.junit.jupiter.api.Assertions.assertTrue;
70+
import static org.junit.jupiter.api.Assertions.fail;
7071
import static org.mockito.ArgumentMatchers.any;
7172
import static org.mockito.ArgumentMatchers.argThat;
7273
import static org.mockito.ArgumentMatchers.eq;
74+
import static org.mockito.Mockito.doThrow;
7375
import static org.mockito.Mockito.isA;
7476
import static org.mockito.Mockito.mock;
7577
import static org.mockito.Mockito.mockStatic;
@@ -704,6 +706,7 @@ public void testReceiveMessage_when_ignorePayloadNotFound_then_messageWithPayloa
704706
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
705707
.withPayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
706708
.withIgnorePayloadNotFound(true);
709+
707710
SqsClient sqsExtended = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));
708711

709712
String receiptHandle = "receipt-handle";
@@ -714,17 +717,18 @@ public void testReceiveMessage_when_ignorePayloadNotFound_then_messageWithPayloa
714717
.build();
715718

716719
when(mockSqsBackend.receiveMessage(isA(ReceiveMessageRequest.class))).thenReturn(ReceiveMessageResponse.builder().messages(message).build());
720+
717721
doThrow(NoSuchKeyException.class).when(mockS3).getObject(any(GetObjectRequest.class));
718722

719723
ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder().queueUrl(SQS_QUEUE_URL).build();
720724
ReceiveMessageResponse receiveMessageResponse = sqsExtended.receiveMessage(messageRequest);
721725

722-
Assert.assertTrue(receiveMessageResponse.messages().isEmpty());
726+
assertTrue(receiveMessageResponse.messages().isEmpty());
723727

724728
ArgumentCaptor<DeleteMessageRequest> deleteMessageRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageRequest.class);
725729
verify(mockSqsBackend).deleteMessage(deleteMessageRequestArgumentCaptor.capture());
726-
Assert.assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().queueUrl());
727-
Assert.assertEquals(receiptHandle, deleteMessageRequestArgumentCaptor.getValue().receiptHandle());
730+
assertEquals(SQS_QUEUE_URL, deleteMessageRequestArgumentCaptor.getValue().queueUrl());
731+
assertEquals(receiptHandle, deleteMessageRequestArgumentCaptor.getValue().receiptHandle());
728732
}
729733

730734
@Test

0 commit comments

Comments
 (0)