Skip to content

Commit 990cb63

Browse files
author
Pankaj Agrawal
committed
Initial API skeleton for Partial SQS batch util
1 parent febb426 commit 990cb63

File tree

8 files changed

+252
-0
lines changed

8 files changed

+252
-0
lines changed

powertools-sqs/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@
5757
<groupId>software.amazon.payloadoffloading</groupId>
5858
<artifactId>payloadoffloading-common</artifactId>
5959
</dependency>
60+
<dependency>
61+
<groupId>software.amazon.awssdk</groupId>
62+
<artifactId>sqs</artifactId>
63+
</dependency>
6064

6165
<dependency>
6266
<groupId>org.aspectj</groupId>

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
*/
1414
package software.amazon.lambda.powertools.sqs;
1515

16+
import java.util.ArrayList;
17+
import java.util.Collections;
1618
import java.util.List;
1719
import java.util.function.Function;
1820
import java.util.stream.Collectors;
1921

2022
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
2123
import com.fasterxml.jackson.core.JsonProcessingException;
2224
import com.fasterxml.jackson.databind.ObjectMapper;
25+
import software.amazon.awssdk.services.sqs.SqsClient;
26+
import software.amazon.lambda.powertools.sqs.internal.BatchContext;
2327
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
2428
import software.amazon.payloadoffloading.PayloadS3Pointer;
2529

@@ -34,6 +38,7 @@
3438
public final class PowertoolsSqs {
3539

3640
private static final ObjectMapper objectMapper = new ObjectMapper();
41+
private static SqsClient client = SqsClient.create();
3742

3843
private PowertoolsSqs() {
3944
}
@@ -80,6 +85,47 @@ public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
8085
return returnValue;
8186
}
8287

88+
public static void defaultSqsClient(SqsClient client) {
89+
PowertoolsSqs.client = client;
90+
}
91+
92+
public static SqsClient defaultSqsClient() {
93+
return client;
94+
}
95+
96+
public static <R> List<R> partialBatchProcessor(final SQSEvent event,
97+
boolean suppressException,
98+
final Class<? extends SqsMessageHandler<R>> handler) {
99+
100+
try {
101+
return partialBatchProcessor(event, suppressException, handler.newInstance());
102+
} catch (IllegalAccessException | InstantiationException e) {
103+
// LOG something
104+
return Collections.emptyList();
105+
}
106+
}
107+
108+
public static <R> List<R> partialBatchProcessor(final SQSEvent event,
109+
final boolean suppressException,
110+
final SqsMessageHandler<R> handler) {
111+
final List<R> handlerReturn = new ArrayList<>();
112+
113+
BatchContext batchContext = new BatchContext(defaultSqsClient());
114+
115+
for (SQSMessage message : event.getRecords()) {
116+
try {
117+
handlerReturn.add(handler.process(message));
118+
batchContext.addSuccess(message);
119+
} catch (Exception e) {
120+
batchContext.addFailure(message, e);
121+
}
122+
}
123+
124+
batchContext.processSuccessAndReset(suppressException);
125+
126+
return handlerReturn;
127+
}
128+
83129
private static SQSMessage clonedMessage(final SQSMessage sqsMessage) {
84130
try {
85131
return objectMapper
@@ -88,4 +134,5 @@ private static SQSMessage clonedMessage(final SQSMessage sqsMessage) {
88134
throw new RuntimeException(e);
89135
}
90136
}
137+
91138
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package software.amazon.lambda.powertools.sqs;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
import static java.util.stream.Collectors.joining;
7+
8+
public class SQSBatchProcessingException extends RuntimeException {
9+
10+
private final List<Exception> exceptions;
11+
12+
public SQSBatchProcessingException(List<Exception> exceptions) {
13+
this.exceptions = new ArrayList<>(exceptions);
14+
}
15+
16+
@Override
17+
public String getMessage() {
18+
return exceptions.stream()
19+
.map(Throwable::getMessage)
20+
.collect(joining("\n"));
21+
}
22+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package software.amazon.lambda.powertools.sqs;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
9+
@Retention(RetentionPolicy.RUNTIME)
10+
@Target(ElementType.METHOD)
11+
public @interface SqsBatchProcessor {
12+
13+
Class<? extends SqsMessageHandler<Object>> value();
14+
15+
boolean suppressException() default false;
16+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package software.amazon.lambda.powertools.sqs;
2+
3+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
4+
5+
@FunctionalInterface
6+
public interface SqsMessageHandler<R> {
7+
8+
R process(SQSMessage message);
9+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package software.amazon.lambda.powertools.sqs.internal;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
6+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
7+
import software.amazon.awssdk.services.sqs.SqsClient;
8+
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
9+
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
10+
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
11+
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
12+
13+
import static java.util.stream.Collectors.toList;
14+
15+
public final class BatchContext {
16+
private List<SQSEvent.SQSMessage> success = new ArrayList<>();
17+
private List<SQSEvent.SQSMessage> failures = new ArrayList<>();
18+
private List<Exception> exceptions = new ArrayList<>();
19+
private SqsClient client;
20+
21+
public BatchContext(SqsClient client) {
22+
this.client = client;
23+
}
24+
25+
public void addSuccess(SQSEvent.SQSMessage event) {
26+
success.add(event);
27+
}
28+
29+
public void addFailure(SQSEvent.SQSMessage event, Exception e) {
30+
failures.add(event);
31+
exceptions.add(e);
32+
}
33+
34+
private boolean hasFailures() {
35+
return !failures.isEmpty();
36+
}
37+
38+
private void cleanUpAndReset() {
39+
DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
40+
.queueUrl(url())
41+
.entries(success.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
42+
.id(m.getMessageId())
43+
.receiptHandle(m.getReceiptHandle())
44+
.build()).collect(toList()))
45+
.build();
46+
47+
client.deleteMessageBatch(request);
48+
}
49+
50+
private String url() {
51+
String[] arnArray = success.get(0).getEventSourceArn().split(":");
52+
return client.getQueueUrl(GetQueueUrlRequest.builder()
53+
.queueOwnerAWSAccountId(arnArray[1])
54+
.queueName(arnArray[2])
55+
.build())
56+
.queueUrl();
57+
}
58+
59+
private void reset() {
60+
success = new ArrayList<>();
61+
failures = new ArrayList<>();
62+
exceptions = new ArrayList<>();
63+
}
64+
65+
public void processSuccessAndReset(final boolean suppressException) {
66+
if (hasFailures() && !suppressException) {
67+
SQSBatchProcessingException exception = new SQSBatchProcessingException(exceptions);
68+
cleanUpAndReset();
69+
throw exception;
70+
} else if (hasFailures()) {
71+
// LOG
72+
cleanUpAndReset();
73+
} else {
74+
reset();
75+
}
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package software.amazon.lambda.powertools.sqs.internal;
2+
3+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
4+
import org.aspectj.lang.ProceedingJoinPoint;
5+
import org.aspectj.lang.annotation.Around;
6+
import org.aspectj.lang.annotation.Aspect;
7+
import org.aspectj.lang.annotation.Pointcut;
8+
import software.amazon.awssdk.services.sqs.SqsClient;
9+
import software.amazon.lambda.powertools.sqs.PowertoolsSqs;
10+
import software.amazon.lambda.powertools.sqs.SqsBatchProcessor;
11+
12+
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
13+
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.placedOnSqsEventRequestHandler;
14+
15+
@Aspect
16+
public class SqsMessageBatchProcessorAspect {
17+
private static final SqsClient client = SqsClient.create();
18+
private static BatchContext details = new BatchContext(PowertoolsSqs.defaultSqsClient());
19+
20+
@SuppressWarnings({"EmptyMethod"})
21+
@Pointcut("@annotation(sqsBatchProcessor)")
22+
public void callAt(SqsBatchProcessor sqsBatchProcessor) {
23+
}
24+
25+
@Around(value = "callAt(sqsBatchProcessor) && execution(@SqsBatchProcessor * *.*(..))", argNames = "pjp,sqsBatchProcessor")
26+
public Object around(ProceedingJoinPoint pjp,
27+
SqsBatchProcessor sqsBatchProcessor) throws Throwable {
28+
Object[] proceedArgs = pjp.getArgs();
29+
30+
if (isHandlerMethod(pjp)
31+
&& placedOnSqsEventRequestHandler(pjp)) {
32+
33+
SQSEvent sqsEvent = (SQSEvent) proceedArgs[0];
34+
35+
PowertoolsSqs.partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value().newInstance());
36+
}
37+
38+
return pjp.proceed(proceedArgs);
39+
}
40+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package software.amazon.lambda.powertools.sqs.handlers;
2+
3+
import java.util.List;
4+
5+
import com.amazonaws.services.lambda.runtime.Context;
6+
import com.amazonaws.services.lambda.runtime.RequestHandler;
7+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
8+
import software.amazon.lambda.powertools.sqs.LargeMessageHandler;
9+
import software.amazon.lambda.powertools.sqs.PowertoolsSqs;
10+
import software.amazon.lambda.powertools.sqs.SqsBatchProcessor;
11+
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
12+
13+
public class PartialBatchHandler implements RequestHandler<SQSEvent, List<Object>> {
14+
15+
16+
@Override
17+
@LargeMessageHandler
18+
@SqsBatchProcessor(HandlerSqs.class)
19+
public List<Object> handleRequest(SQSEvent sqsEvent, Context context) {
20+
21+
List<Object> returnValues =
22+
PowertoolsSqs.partialBatchProcessor(sqsEvent, false, HandlerSqs.class);
23+
24+
// Do some processing on processed message
25+
26+
return returnValues;
27+
}
28+
29+
private class HandlerSqs implements SqsMessageHandler<Object> {
30+
31+
@Override
32+
public String process(SQSEvent.SQSMessage message) {
33+
// This is where you process message
34+
return null;
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)