diff --git a/.gitignore b/.gitignore
index 2f7896d..58d1ff7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1 +1,2 @@
target/
+.idea
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 2ec00ac..a8af002 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.amazonaws
amazon-sqs-java-extended-client-lib
- 1.0.2
+ 1.0.3
jar
Amazon SQS Extended Client Library for Java
An extension to the Amazon SQS client that enables sending and receiving messages up to 2GB via Amazon S3.
diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java
index 8200d7b..5293c68 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClient.java
@@ -1335,7 +1335,8 @@ private void storeTextInS3(String s3Key, String messageContentStr, Long messageC
PutObjectRequest putObjectRequest = new PutObjectRequest(clientConfiguration.getS3BucketName(), s3Key,
messageContentStream, messageContentStreamMetadata);
try {
- clientConfiguration.getAmazonS3Client().putObject(putObjectRequest);
+ PutObjectRequest clientModifiedPutRequest = clientConfiguration.getPutObjectModifier().apply(putObjectRequest);
+ clientConfiguration.getAmazonS3Client().putObject(clientModifiedPutRequest);
} catch (AmazonServiceException e) {
String errorMessage = "Failed to store the message content in an S3 object. SQS message was not sent.";
LOG.error(errorMessage, e);
diff --git a/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java b/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java
index 4852178..cc80db1 100644
--- a/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java
+++ b/src/main/java/com/amazon/sqs/javamessaging/ExtendedClientConfiguration.java
@@ -16,13 +16,13 @@
package com.amazon.sqs.javamessaging;
import com.amazonaws.AmazonClientException;
+import com.amazonaws.internal.SdkFunction;
import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.PutObjectRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.annotation.NotThreadSafe;
-import java.util.List;
-
/**
* Amazon SQS extended client configuration options such as Amazon S3 client,
* bucket name, and message size threshold for large-payload messages.
@@ -36,6 +36,7 @@ public class ExtendedClientConfiguration {
private boolean largePayloadSupport = false;
private boolean alwaysThroughS3 = false;
private int messageSizeThreshold = SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD;
+ private SdkFunction putObjectModifier = new PutObjectRequestIdentityFunction();
public ExtendedClientConfiguration() {
s3 = null;
@@ -48,6 +49,7 @@ public ExtendedClientConfiguration(ExtendedClientConfiguration other) {
this.largePayloadSupport = other.largePayloadSupport;
this.alwaysThroughS3 = other.alwaysThroughS3;
this.messageSizeThreshold = other.messageSizeThreshold;
+ this.putObjectModifier = other.putObjectModifier;
}
/**
@@ -214,4 +216,45 @@ public ExtendedClientConfiguration withAlwaysThroughS3(boolean alwaysThroughS3)
public boolean isAlwaysThroughS3() {
return alwaysThroughS3;
}
+
+ /**
+ * Sets the function which you can use to modify the PutObjectRequest to enable support for encryption headers,
+ * meta-data, etc.
+ *
+ * @param putObjectModifier
+ * An implementation of the internal SdkFunction which takes a PutObjectRequest, modifies it, and returns it.
+ */
+ public void setPutObjectModifier(SdkFunction putObjectModifier)
+ {
+ this.putObjectModifier = putObjectModifier;
+ }
+
+ /**
+ * Sets the function which you can use to modify the PutObjectRequest to enable support for encryption headers,
+ * meta-data, etc.
+ *
+ * @param putObjectModifier
+ * An implementation of the internal SdkFunction which takes a PutObjectRequest, modifies it, and returns it.
+ */
+ public ExtendedClientConfiguration withPutObjectModifier(SdkFunction putObjectModifier) {
+ this.setPutObjectModifier(putObjectModifier);
+ return this;
+ }
+
+ /**
+ * Used to return the function which modifies the PutObjectRequests
+ *
+ * @return The custom configured SdkFunction or an identity function by default.
+ * stored in Amazon S3. Default: false
+ */
+ public SdkFunction getPutObjectModifier() {
+ return putObjectModifier;
+ }
+
+ static class PutObjectRequestIdentityFunction implements SdkFunction {
+ @Override
+ public PutObjectRequest apply(PutObjectRequest putObjectRequest) {
+ return putObjectRequest;
+ }
+ }
}
diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java
index 3dd70c3..e7d4753 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
+import com.amazonaws.internal.SdkFunction;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.sqs.AmazonSQS;
@@ -34,8 +35,14 @@
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
@@ -48,11 +55,13 @@
/**
* Tests the AmazonSQSExtendedClient class.
*/
+@RunWith(MockitoJUnitRunner.class)
public class AmazonSQSExtendedClientTest {
private AmazonSQS extendedSqsWithDefaultConfig;
- private AmazonSQS mockSqsBackend;
- private AmazonS3 mockS3;
+ @Mock private AmazonSQS mockSqsBackend;
+ @Mock private AmazonS3 mockS3;
+ @Mock private SdkFunction mockModificationFunction;
private static final String S3_BUCKET_NAME = "test-bucket-name";
private static final String SQS_QUEUE_URL = "test-queue-url";
@@ -65,12 +74,19 @@ public class AmazonSQSExtendedClientTest {
@Before
public void setupClient() {
- mockS3 = mock(AmazonS3.class);
- mockSqsBackend = mock(AmazonSQS.class);
when(mockS3.putObject(isA(PutObjectRequest.class))).thenReturn(null);
+ when(mockModificationFunction.apply(any(PutObjectRequest.class))).thenAnswer(new Answer()
+ {
+ @Override
+ public PutObjectRequest answer(InvocationOnMock invocationOnMock) {
+ return (PutObjectRequest) invocationOnMock.getArguments()[0];
+ }
+ });
+
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
- .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME);
+ .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
+ .withPutObjectModifier(mockModificationFunction);
extendedSqsWithDefaultConfig = spy(new AmazonSQSExtendedClient(mockSqsBackend, extendedClientConfiguration));
@@ -84,6 +100,7 @@ public void testWhenSendLargeMessageThenPayloadIsStoredInS3() {
extendedSqsWithDefaultConfig.sendMessage(messageRequest);
verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class));
+ verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class));
}
@Test
@@ -95,6 +112,7 @@ public void testWhenSendSmallMessageThenS3IsNotUsed() {
extendedSqsWithDefaultConfig.sendMessage(messageRequest);
verify(mockS3, never()).putObject(isA(PutObjectRequest.class));
+ verify(mockModificationFunction, never()).apply(any(PutObjectRequest.class));
}
@Test
@@ -117,13 +135,16 @@ public void testWhenSendMessageWithAlwaysThroughS3AndMessageIsSmallThenItIsStill
int messageLength = LESS_THAN_SQS_SIZE_LIMIT;
String messageBody = generateStringWithLength(messageLength);
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
- .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withAlwaysThroughS3(true);
+ .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
+ .withAlwaysThroughS3(true)
+ .withPutObjectModifier(mockModificationFunction);
AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration));
SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody);
sqsExtended.sendMessage(messageRequest);
verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class));
+ verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class));
}
@Test
@@ -131,13 +152,16 @@ public void testWhenSendMessageWithSetMessageSizeThresholdThenThresholdIsHonored
int messageLength = ARBITRATY_SMALLER_THRESSHOLD * 2;
String messageBody = generateStringWithLength(messageLength);
ExtendedClientConfiguration extendedClientConfiguration = new ExtendedClientConfiguration()
- .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME).withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD);
+ .withLargePayloadSupportEnabled(mockS3, S3_BUCKET_NAME)
+ .withMessageSizeThreshold(ARBITRATY_SMALLER_THRESSHOLD)
+ .withPutObjectModifier(mockModificationFunction);
AmazonSQS sqsExtended = spy(new AmazonSQSExtendedClient(mock(AmazonSQSClient.class), extendedClientConfiguration));
SendMessageRequest messageRequest = new SendMessageRequest(SQS_QUEUE_URL, messageBody);
sqsExtended.sendMessage(messageRequest);
verify(mockS3, times(1)).putObject(isA(PutObjectRequest.class));
+ verify(mockModificationFunction, times(1)).apply(any(PutObjectRequest.class));
}
@Test
@@ -191,6 +215,7 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor
// There should be 8 puts for the 8 messages above the threshhold
verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class));
+ verify(mockModificationFunction, times(8)).apply(any(PutObjectRequest.class));
}
@Test
diff --git a/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java b/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java
index 9949853..68624e0 100644
--- a/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java
+++ b/src/test/java/com/amazon/sqs/javamessaging/ExtendedClientConfigurationTest.java
@@ -15,6 +15,7 @@
package com.amazon.sqs.javamessaging;
+import com.amazonaws.internal.SdkFunction;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import junit.framework.Assert;
@@ -32,7 +33,6 @@ public class ExtendedClientConfigurationTest {
@Before
public void setup() {
-
}
@Test
@@ -116,5 +116,49 @@ public void testMessageSizeThreshold() {
}
+ @Test
+ public void identityFunction_doesNotModifyInput()
+ {
+ ExtendedClientConfiguration.PutObjectRequestIdentityFunction identityFunction = new ExtendedClientConfiguration.PutObjectRequestIdentityFunction();
+ PutObjectRequest inputRequest = mock(PutObjectRequest.class);
+ PutObjectRequest outputRequest = identityFunction.apply(inputRequest);
+ Assert.assertSame(inputRequest, outputRequest);
+ verifyZeroInteractions(inputRequest);
+ }
+
+ @Test
+ public void noPutObjectFunctionDefined_useIdentity()
+ {
+ ExtendedClientConfiguration configuration = new ExtendedClientConfiguration();
+ Assert.assertEquals(configuration.getPutObjectModifier().getClass(), ExtendedClientConfiguration.PutObjectRequestIdentityFunction.class);
+ }
+ @Test
+ public void testPutObjectFunctionSetter()
+ {
+ ExtendedClientConfiguration configuration = new ExtendedClientConfiguration();
+ SdkFunction modifierFunction = new SdkFunction() {
+ @Override
+ public PutObjectRequest apply(PutObjectRequest putObjectRequest) {
+ return null;
+ }
+ };
+
+ configuration.setPutObjectModifier(modifierFunction);
+ Assert.assertEquals(configuration.getPutObjectModifier(), modifierFunction);
+ }
+
+ @Test
+ public void testPutObjectFunctioFluentApi()
+ {
+ SdkFunction modifierFunction = new SdkFunction() {
+ @Override
+ public PutObjectRequest apply(PutObjectRequest putObjectRequest) {
+ return null;
+ }
+ };
+
+ ExtendedClientConfiguration configuration = new ExtendedClientConfiguration().withPutObjectModifier(modifierFunction);
+ Assert.assertEquals(configuration.getPutObjectModifier(), modifierFunction);
+ }
}