Skip to content

Commit 0752389

Browse files
author
Pankaj Agrawal
committed
Better error handling
1 parent 990cb63 commit 0752389

File tree

7 files changed

+74
-64
lines changed

7 files changed

+74
-64
lines changed

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,30 @@
1414
package software.amazon.lambda.powertools.sqs;
1515

1616
import java.util.ArrayList;
17-
import java.util.Collections;
1817
import java.util.List;
1918
import java.util.function.Function;
2019
import java.util.stream.Collectors;
2120

2221
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
2322
import com.fasterxml.jackson.core.JsonProcessingException;
2423
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import org.apache.commons.logging.Log;
25+
import org.apache.commons.logging.LogFactory;
2526
import software.amazon.awssdk.services.sqs.SqsClient;
2627
import software.amazon.lambda.powertools.sqs.internal.BatchContext;
27-
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
28+
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
2829
import software.amazon.payloadoffloading.PayloadS3Pointer;
2930

3031
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
31-
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.processMessages;
32+
import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.processMessages;
3233

3334
/**
3435
* A class of helper functions to add additional functionality to LargeMessageHandler.
3536
* <p>
3637
* {@see PowertoolsLogging}
3738
*/
3839
public final class PowertoolsSqs {
40+
private static final Log LOG = LogFactory.getLog(PowertoolsSqs.class);
3941

4042
private static final ObjectMapper objectMapper = new ObjectMapper();
4143
private static SqsClient client = SqsClient.create();
@@ -79,7 +81,7 @@ public static <R> R enrichedMessageFromS3(final SQSEvent sqsEvent,
7981
R returnValue = messageFunction.apply(sqsMessages);
8082

8183
if (deleteS3Payload) {
82-
s3Pointers.forEach(SqsMessageAspect::deleteMessage);
84+
s3Pointers.forEach(SqsLargeMessageAspect::deleteMessage);
8385
}
8486

8587
return returnValue;
@@ -94,14 +96,15 @@ public static SqsClient defaultSqsClient() {
9496
}
9597

9698
public static <R> List<R> partialBatchProcessor(final SQSEvent event,
97-
boolean suppressException,
99+
final boolean suppressException,
98100
final Class<? extends SqsMessageHandler<R>> handler) {
99101

100102
try {
101103
return partialBatchProcessor(event, suppressException, handler.newInstance());
102104
} catch (IllegalAccessException | InstantiationException e) {
103-
// LOG something
104-
return Collections.emptyList();
105+
LOG.error("Failed invoking process method on handler", e);
106+
throw new RuntimeException("Unexpected error occurred. Please raise issue at " +
107+
"https://github.com/awslabs/aws-lambda-powertools-java/issues", e);
105108
}
106109
}
107110

@@ -134,5 +137,4 @@ private static SQSMessage clonedMessage(final SQSMessage sqsMessage) {
134137
throw new RuntimeException(e);
135138
}
136139
}
137-
138140
}

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java

Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,48 +3,73 @@
33
import java.util.ArrayList;
44
import java.util.List;
55

6-
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
6+
import org.apache.commons.logging.Log;
7+
import org.apache.commons.logging.LogFactory;
78
import software.amazon.awssdk.services.sqs.SqsClient;
89
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
910
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
1011
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
1112
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
1213

14+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
15+
import static java.lang.String.format;
1316
import static java.util.stream.Collectors.toList;
1417

1518
public final class BatchContext {
16-
private List<SQSEvent.SQSMessage> success = new ArrayList<>();
17-
private List<SQSEvent.SQSMessage> failures = new ArrayList<>();
18-
private List<Exception> exceptions = new ArrayList<>();
19-
private SqsClient client;
19+
private static final Log LOG = LogFactory.getLog(BatchContext.class);
20+
21+
private final List<SQSMessage> success = new ArrayList<>();
22+
private final List<SQSMessage> failures = new ArrayList<>();
23+
private final List<Exception> exceptions = new ArrayList<>();
24+
private final SqsClient client;
2025

2126
public BatchContext(SqsClient client) {
2227
this.client = client;
2328
}
2429

25-
public void addSuccess(SQSEvent.SQSMessage event) {
30+
public void addSuccess(SQSMessage event) {
2631
success.add(event);
2732
}
2833

29-
public void addFailure(SQSEvent.SQSMessage event, Exception e) {
34+
public void addFailure(SQSMessage event, Exception e) {
3035
failures.add(event);
3136
exceptions.add(e);
3237
}
3338

39+
public void processSuccessAndReset(final boolean suppressException) {
40+
try {
41+
if (hasFailures()) {
42+
43+
deleteSuccessMessage();
44+
45+
if (suppressException) {
46+
List<String> messageIds = failures.stream().map(SQSMessage::getMessageId).collect(toList());
47+
LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. Failed messages %s", failures.size(), messageIds));
48+
} else {
49+
throw new SQSBatchProcessingException(exceptions);
50+
}
51+
}
52+
} finally {
53+
reset();
54+
}
55+
}
56+
3457
private boolean hasFailures() {
3558
return !failures.isEmpty();
3659
}
3760

38-
private void cleanUpAndReset() {
39-
DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
40-
.queueUrl(url())
41-
.entries(success.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
42-
.id(m.getMessageId())
43-
.receiptHandle(m.getReceiptHandle())
44-
.build()).collect(toList()))
45-
.build();
61+
private void deleteSuccessMessage() {
62+
if (!success.isEmpty()) {
63+
DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
64+
.queueUrl(url())
65+
.entries(success.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
66+
.id(m.getMessageId())
67+
.receiptHandle(m.getReceiptHandle())
68+
.build()).collect(toList()))
69+
.build();
4670

47-
client.deleteMessageBatch(request);
71+
client.deleteMessageBatch(request);
72+
}
4873
}
4974

5075
private String url() {
@@ -57,21 +82,8 @@ private String url() {
5782
}
5883

5984
private void reset() {
60-
success = new ArrayList<>();
61-
failures = new ArrayList<>();
62-
exceptions = new ArrayList<>();
63-
}
64-
65-
public void processSuccessAndReset(final boolean suppressException) {
66-
if (hasFailures() && !suppressException) {
67-
SQSBatchProcessingException exception = new SQSBatchProcessingException(exceptions);
68-
cleanUpAndReset();
69-
throw exception;
70-
} else if (hasFailures()) {
71-
// LOG
72-
cleanUpAndReset();
73-
} else {
74-
reset();
75-
}
85+
success.clear();
86+
failures.clear();
87+
exceptions.clear();
7688
}
7789
}

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java renamed to powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspect.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@
2828
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
2929

3030
@Aspect
31-
public class SqsMessageAspect {
31+
public class SqsLargeMessageAspect {
3232

33-
private static final Log LOG = LogFactory.getLog(SqsMessageAspect.class);
33+
private static final Log LOG = LogFactory.getLog(SqsLargeMessageAspect.class);
3434
private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();
3535

3636
@SuppressWarnings({"EmptyMethod"})

powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,14 @@
55
import org.aspectj.lang.annotation.Around;
66
import org.aspectj.lang.annotation.Aspect;
77
import org.aspectj.lang.annotation.Pointcut;
8-
import software.amazon.awssdk.services.sqs.SqsClient;
9-
import software.amazon.lambda.powertools.sqs.PowertoolsSqs;
108
import software.amazon.lambda.powertools.sqs.SqsBatchProcessor;
119

1210
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
13-
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.placedOnSqsEventRequestHandler;
11+
import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.partialBatchProcessor;
12+
import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.placedOnSqsEventRequestHandler;
1413

1514
@Aspect
1615
public class SqsMessageBatchProcessorAspect {
17-
private static final SqsClient client = SqsClient.create();
18-
private static BatchContext details = new BatchContext(PowertoolsSqs.defaultSqsClient());
1916

2017
@SuppressWarnings({"EmptyMethod"})
2118
@Pointcut("@annotation(sqsBatchProcessor)")
@@ -32,7 +29,7 @@ && placedOnSqsEventRequestHandler(pjp)) {
3229

3330
SQSEvent sqsEvent = (SQSEvent) proceedArgs[0];
3431

35-
PowertoolsSqs.partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value().newInstance());
32+
partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value().newInstance());
3633
}
3734

3835
return pjp.proceed(proceedArgs);

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.junit.jupiter.params.provider.MethodSource;
2222
import org.junit.jupiter.params.provider.ValueSource;
2323
import org.mockito.Mock;
24-
import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect;
24+
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
2525

2626
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
2727
import static java.util.Collections.singletonList;
@@ -33,7 +33,7 @@
3333
import static org.mockito.Mockito.verify;
3434
import static org.mockito.Mockito.verifyNoInteractions;
3535
import static org.mockito.Mockito.when;
36-
import static org.mockito.MockitoAnnotations.initMocks;
36+
import static org.mockito.MockitoAnnotations.openMocks;
3737

3838
class PowertoolsSqsTest {
3939

@@ -44,8 +44,8 @@ class PowertoolsSqsTest {
4444

4545
@BeforeEach
4646
void setUp() throws IllegalAccessException {
47-
initMocks(this);
48-
writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true);
47+
openMocks(this);
48+
writeStaticField(SqsLargeMessageAspect.class, "amazonS3", amazonS3, true);
4949
}
5050

5151
@Test
@@ -122,7 +122,7 @@ public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exce
122122
String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
123123
SQSEvent sqsEvent = messageWithBody(messageBody);
124124

125-
assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class)
125+
assertThatExceptionOfType(SqsLargeMessageAspect.FailedProcessingLargePayloadException.class)
126126
.isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody()))
127127
.withCause(exception);
128128

@@ -145,7 +145,7 @@ public void close() throws IOException {
145145
String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]";
146146
SQSEvent sqsEvent = messageWithBody(messageBody);
147147

148-
assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class)
148+
assertThatExceptionOfType(SqsLargeMessageAspect.FailedProcessingLargePayloadException.class)
149149
.isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody()))
150150
.withCauseInstanceOf(IOException.class);
151151

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
public class PartialBatchHandler implements RequestHandler<SQSEvent, List<Object>> {
1414

15-
1615
@Override
1716
@LargeMessageHandler
1817
@SqsBatchProcessor(HandlerSqs.class)

powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java renamed to powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspectTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package software.amazon.lambda.powertools.sqs.internal;
22

3+
import java.io.ByteArrayInputStream;
4+
import java.io.IOException;
5+
import java.util.stream.Stream;
6+
37
import com.amazonaws.AmazonServiceException;
48
import com.amazonaws.SdkClientException;
59
import com.amazonaws.services.lambda.runtime.Context;
@@ -21,10 +25,6 @@
2125
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler;
2226
import software.amazon.lambda.powertools.sqs.handlers.SqsNoDeleteMessageHandler;
2327

24-
import java.io.ByteArrayInputStream;
25-
import java.io.IOException;
26-
import java.util.stream.Stream;
27-
2828
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
2929
import static java.util.Collections.singletonList;
3030
import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField;
@@ -35,10 +35,10 @@
3535
import static org.mockito.Mockito.verify;
3636
import static org.mockito.Mockito.verifyNoInteractions;
3737
import static org.mockito.Mockito.when;
38-
import static org.mockito.MockitoAnnotations.initMocks;
39-
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.FailedProcessingLargePayloadException;
38+
import static org.mockito.MockitoAnnotations.openMocks;
39+
import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.FailedProcessingLargePayloadException;
4040

41-
public class SqsMessageAspectTest {
41+
public class SqsLargeMessageAspectTest {
4242

4343
private RequestHandler<SQSEvent, String> requestHandler;
4444

@@ -53,9 +53,9 @@ public class SqsMessageAspectTest {
5353

5454
@BeforeEach
5555
void setUp() throws IllegalAccessException {
56-
initMocks(this);
56+
openMocks(this);
5757
setupContext();
58-
writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true);
58+
writeStaticField(SqsLargeMessageAspect.class, "amazonS3", amazonS3, true);
5959
requestHandler = new SqsMessageHandler();
6060
}
6161

0 commit comments

Comments
 (0)