diff --git a/.gitignore b/.gitignore index 2f7896d..58d1ff7 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ target/ +.idea \ No newline at end of file diff --git a/pom.xml b/pom.xml index 2ec00ac..a8af002 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.amazonaws amazon-sqs-java-extended-client-lib - 1.0.2 + 1.0.3 jar Amazon SQS Extended Client Library for Java An extension to the Amazon SQS client that enables sending and receiving messages up to 2GB via Amazon S3. diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java index 8200d7b..5293c68 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java @@ -1335,7 +1335,8 @@ private void storeTextInS3(String s3Key, String messageContentStr, Long messageC PutObjectRequest putObjectRequest = new PutObjectRequest(clientConfiguration.getS3BucketName(), s3Key, messageContentStream, messageContentStreamMetadata); try { - clientConfiguration.getAmazonS3Client().putObject(putObjectRequest); + PutObjectRequest clientModifiedPutRequest = clientConfiguration.getPutObjectModifier().apply(putObjectRequest); + clientConfiguration.getAmazonS3Client().putObject(clientModifiedPutRequest); } catch (AmazonServiceException e) { String errorMessage = "Failed to store the message content in an S3 object. SQS message was not sent."; LOG.error(errorMessage, e); diff --git a/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java b/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java index 4852178..cc80db1 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java +++ b/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java @@ -16,13 +16,13 @@ package com.amazon.sqs.javamessaging; import com.amazonaws.AmazonClientException; +import com.amazonaws.internal.SdkFunction; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.PutObjectRequest; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.amazonaws.annotation.NotThreadSafe; -import java.util.List; - /** * Amazon SQS extended client configuration options such as Amazon S3 client, * bucket name, and message size threshold for large-payload messages. @@ -36,6 +36,7 @@ public class ExtendedClientConfiguration { private boolean largePayloadSupport = false; private boolean alwaysThroughS3 = false; private int messageSizeThreshold = SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD; + private SdkFunction putObjectModifier = new PutObjectRequestIdentityFunction(); public ExtendedClientConfiguration() { s3 = null; @@ -48,6 +49,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) { this.largePayloadSupport = other.largePayloadSupport; this.alwaysThroughS3 = other.alwaysThroughS3; this.messageSizeThreshold = other.messageSizeThreshold; + this.putObjectModifier = other.putObjectModifier; } /** @@ -214,4 +216,45 @@ public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) public boolean isAlwaysThroughS3() { return alwaysThroughS3; } + + /** + * Sets the function which you can use to modify the PutObjectRequest to enable support for encryption headers, + * meta-data, etc. + * + * @param putObjectModifier + * An implementation of the internal SdkFunction which takes a PutObjectRequest, modifies it, and returns it. + */ + public void setPutObjectModifier(SdkFunction putObjectModifier) + { + this.putObjectModifier = putObjectModifier; + } + + /** + * Sets the function which you can use to modify the PutObjectRequest to enable support for encryption headers, + * meta-data, etc. + * + * @param putObjectModifier + * An implementation of the internal SdkFunction which takes a PutObjectRequest, modifies it, and returns it. + */ + public ExtendedClientConfiguration withPutObjectModifier(SdkFunction putObjectModifier) { + this.setPutObjectModifier(putObjectModifier); + return this; + } + + /** + * Used to return the function which modifies the PutObjectRequests + * + * @return The custom configured SdkFunction or an identity function by default. + * stored in Amazon S3. Default: false + */ + public SdkFunction getPutObjectModifier() { + return putObjectModifier; + } + + static class PutObjectRequestIdentityFunction implements SdkFunction { + @Override + public PutObjectRequest apply(PutObjectRequest putObjectRequest) { + return putObjectRequest; + } + } } diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java index 3dd70c3..e7d4753 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import com.amazonaws.internal.SdkFunction; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.sqs.AmazonSQS; @@ -34,8 +35,14 @@ import junit.framework.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.isA; import static org.mockito.Mockito.mock; @@ -48,11 +55,13 @@ /** * Tests the AmazonSQSExtendedClient class. */ +@RunWith(MockitoJUnitRunner.class) public class AmazonSQSExtendedClientTest { private AmazonSQS extendedSqsWithDefaultConfig; - private AmazonSQS mockSqsBackend; - private AmazonS3 mockS3; + @Mock private AmazonSQS mockSqsBackend; + @Mock private AmazonS3 mockS3; + @Mock private SdkFunction mockModificationFunction; private static final String S3_BUCKET_NAME = "test-bucket-name"; private static final String SQS_QUEUE_URL = "test-queue-url"; @@ -65,12 +74,19 @@ public class AmazonSQSExtendedClientTest { @Before public void setupClient() { - mockS3 = mock(AmazonS3.class); - mockSqsBackend = mock(AmazonSQS.class); when(mockS3.putObject(isA(PutObjectRequest.class))).thenReturn(null); + when(mockModificationFunction.apply(any(PutObjectRequest.class))).thenAnswer(new Answer() + { + @Override + public PutObjectRequest answer(InvocationOnMock invocationOnMock) { + return (PutObjectRequest) invocationOnMock.getArguments()[0]; + } + }); + ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() - .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME); + .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME) + .withPutObjectModifier(mockModificationFunction); extendedSqsWithDefaultConfig = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration)); @@ -84,6 +100,7 @@ public void testWhenSendLargeMessageThenPayloadIsStoredInS3() { extendedSqsWithDefaultConfig.sendMessage(messageRequest); verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class)); + verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class)); } @Test @@ -95,6 +112,7 @@ public void testWhenSendSmallMessageThenS3IsNotUsed() { extendedSqsWithDefaultConfig.sendMessage(messageRequest); verify(mockS3, never()).putObject(isA(PutObjectRequest.class)); + verify(mockModificationFunction, never()).apply(any(PutObjectRequest.class)); } @Test @@ -117,13 +135,16 @@ public void testWhenSendMessageWithAlwaysThroughS3AndMessageIsSmallThenItIsStill int messageLength = LESS_THAN_SQS_SIZE_LIMIT; String messageBody = generateStringWithLength(messageLength); ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() - .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withAlwaysThroughS3(true); + .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME) + .withAlwaysThroughS3(true) + .withPutObjectModifier(mockModificationFunction); AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration)); SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody); sqsExtended.sendMessage(messageRequest); verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class)); + verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class)); } @Test @@ -131,13 +152,16 @@ public void testWhenSendMessageWithSetMessageSizeThresholdThenThresholdIsHonored int messageLength = ARBITRATY_SMALLER_THRESSHOLD * 2; String messageBody = generateStringWithLength(messageLength); ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration() - .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD); + .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME) + .withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD) + .withPutObjectModifier(mockModificationFunction); AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration)); SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody); sqsExtended.sendMessage(messageRequest); verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class)); + verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class)); } @Test @@ -191,6 +215,7 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor // There should be 8 puts for the 8 messages above the threshhold verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class)); + verify(mockModificationFunction, times(8)).apply(any(PutObjectRequest.class)); } @Test diff --git a/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java b/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java index 9949853..68624e0 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java @@ -15,6 +15,7 @@ package com.amazon.sqs.javamessaging; +import com.amazonaws.internal.SdkFunction; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.PutObjectRequest; import junit.framework.Assert; @@ -32,7 +33,6 @@ public class ExtendedClientConfigurationTest { @Before public void setup() { - } @Test @@ -116,5 +116,49 @@ public void testMessageSizeThreshold() { } + @Test + public void identityFunction_doesNotModifyInput() + { + ExtendedClientConfiguration.PutObjectRequestIdentityFunction identityFunction = new ExtendedClientConfiguration.PutObjectRequestIdentityFunction(); + PutObjectRequest inputRequest = mock(PutObjectRequest.class); + PutObjectRequest outputRequest = identityFunction.apply(inputRequest); + Assert.assertSame(inputRequest, outputRequest); + verifyZeroInteractions(inputRequest); + } + + @Test + public void noPutObjectFunctionDefined_useIdentity() + { + ExtendedClientConfiguration configuration = new ExtendedClientConfiguration(); + Assert.assertEquals(configuration.getPutObjectModifier().getClass(), ExtendedClientConfiguration.PutObjectRequestIdentityFunction.class); + } + @Test + public void testPutObjectFunctionSetter() + { + ExtendedClientConfiguration configuration = new ExtendedClientConfiguration(); + SdkFunction modifierFunction = new SdkFunction() { + @Override + public PutObjectRequest apply(PutObjectRequest putObjectRequest) { + return null; + } + }; + + configuration.setPutObjectModifier(modifierFunction); + Assert.assertEquals(configuration.getPutObjectModifier(), modifierFunction); + } + + @Test + public void testPutObjectFunctioFluentApi() + { + SdkFunction modifierFunction = new SdkFunction() { + @Override + public PutObjectRequest apply(PutObjectRequest putObjectRequest) { + return null; + } + }; + + ExtendedClientConfiguration configuration = new ExtendedClientConfiguration().withPutObjectModifier(modifierFunction); + Assert.assertEquals(configuration.getPutObjectModifier(), modifierFunction); + } }