Skip to content

Commit 6ebbd3f

Browse files
pankajagrawal16Pankaj Agrawal
and
Pankaj Agrawal
authored
feat: SQS Partial batch Utility (#120)
* Initial API skeleton for Partial SQS batch util * Better error handling * Initial Test cases setup and some refactorings * Full Test cases coverage * Rename API method for batch processing * java docs * public docs update * Fix correct place holder for queuename and account * Example usage with relevant permissions * Minor doc updates * Ranme method to set custom sqs client * Make test less confusing Co-authored-by: Pankaj Agrawal <pankaagr@amazon.com>
1 parent 748b11a commit 6ebbd3f

26 files changed

+1416
-28
lines changed

docs/content/utilities/batch.mdx

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
---
2+
title: SQS Batch Processing
3+
description: Utility
4+
---
5+
6+
import Note from "../../src/components/Note"
7+
8+
The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS.
9+
10+
**Key Features**
11+
12+
* Prevent successfully processed messages from being returned to SQS
13+
* A simple interface for individually processing messages from a batch
14+
15+
**Background**
16+
17+
When using SQS as a Lambda event source mapping, Lambda functions can be triggered with a batch of messages from SQS.
18+
19+
If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function will be triggered with the same batch again.
20+
21+
With this utility, messages within a batch will be handled individually - only messages that were not successfully processed
22+
are returned to the queue.
23+
24+
<Note type="warning">
25+
While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible.
26+
<br/><br/>
27+
More details on how Lambda works with SQS can be found in the <a href="https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html">AWS documentation</a>
28+
</Note>
29+
30+
## Install
31+
32+
To install this utility, add the following dependency to your project.
33+
34+
```xml
35+
<dependency>
36+
<groupId>software.amazon.lambda</groupId>
37+
<artifactId>powertools-sqs</artifactId>
38+
<version>0.4.0-beta</version>
39+
</dependency>
40+
```
41+
42+
And configure the aspectj-maven-plugin to compile-time weave (CTW) the
43+
aws-lambda-powertools-java aspects into your project. You may already have this
44+
plugin in your pom. In that case add the dependency to the `aspectLibraries`
45+
section.
46+
47+
```xml
48+
<build>
49+
<plugins>
50+
...
51+
<plugin>
52+
<groupId>org.codehaus.mojo</groupId>
53+
<artifactId>aspectj-maven-plugin</artifactId>
54+
<version>1.11</version>
55+
<configuration>
56+
<source>1.8</source>
57+
<target>1.8</target>
58+
<complianceLevel>1.8</complianceLevel>
59+
<aspectLibraries>
60+
<!-- highlight-start -->
61+
<aspectLibrary>
62+
<groupId>software.amazon.lambda</groupId>
63+
<artifactId>powertools-sqs</artifactId>
64+
</aspectLibrary>
65+
<!-- highlight-end -->
66+
</aspectLibraries>
67+
</configuration>
68+
<executions>
69+
<execution>
70+
<goals>
71+
<goal>compile</goal>
72+
</goals>
73+
</execution>
74+
</executions>
75+
</plugin>
76+
...
77+
</plugins>
78+
</build>
79+
```
80+
81+
**IAM Permissions**
82+
83+
This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:GetQueueUrl` and `sqs:DeleteMessageBatch` permission.
84+
85+
## Processing messages from SQS
86+
87+
You can use either **[SqsBatchProcessor annotation](#SqsBatchProcessor annotation)**, or **[PowertoolsSqs Utility API](#PowertoolsSqs Utility API)** as a fluent API.
88+
89+
Both have nearly the same behaviour when it comes to processing messages from the batch:
90+
91+
* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost
92+
* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `SqsMessageHandler` interface implementation, we will:
93+
- **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch`
94+
- **2)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue
95+
96+
The only difference is that **PowertoolsSqs Utility API** will give you access to return from the processed messages if you need. Exception `SQSBatchProcessingException` thrown from the
97+
utility will have access to both successful and failed messaged along with failure exceptions.
98+
99+
## Functional Interface SqsMessageHandler
100+
101+
Both [annotation](#SqsBatchProcessor annotation) and [PowertoolsSqs Utility API](#PowertoolsSqs Utility API) requires an implementation of functional interface `SqsMessageHandler`.
102+
103+
This implementation is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent.
104+
105+
**Any non-exception/successful return from your record handler function** will instruct utility to queue up each individual message for deletion.
106+
107+
### SqsBatchProcessor annotation
108+
109+
When using this annotation, you need provide a class implementation of `SqsMessageHandler` that will process individual messages from the batch - It should raise an exception if it is unable to process the record.
110+
111+
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
112+
113+
* **Any successfully processed messages**, we will delete them from the queue via `sqs:DeleteMessageBatch`
114+
* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue
115+
116+
<Note type="warning">
117+
You will not have accessed to the <strong>processed messages</strong> within the Lambda Handler - all processing logic will and should be performed by the implemented <code>SqsMessageHandler#process()</code> function.
118+
119+
</Note><br/>
120+
121+
```java:title=App.java
122+
public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
123+
@Override
124+
@SqsBatchProcessor(SampleMessageHandler.class) // highlight-line
125+
public String handleRequest(SQSEvent input, Context context) {
126+
return "{\"statusCode\": 200}";
127+
}
128+
129+
public class SampleMessageHandler implements SqsMessageHandler<Object> {
130+
131+
@Override
132+
public String process(SQSMessage message) {
133+
// This will be called for each individual message from a batch
134+
// It should raise an exception if the message was not processed successfully
135+
String returnVal = doSomething(message.getBody());
136+
return returnVal;
137+
}
138+
}
139+
}
140+
```
141+
142+
### PowertoolsSqs Utility API
143+
144+
If you require access to the result of processed messages, you can use this utility.
145+
146+
The result from calling <code>PowertoolsSqs#batchProcessor()</code> on the context manager will be a list of all the return values from your <code>SqsMessageHandler#process()</code> function.
147+
148+
```java:title=App.java
149+
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {
150+
@Override
151+
public List<String> handleRequest(SQSEvent input, Context context) {
152+
List<String> returnValues = PowertoolsSqs.batchProcessor(input, SampleMessageHandler.class); // highlight-line
153+
154+
return returnValues;
155+
}
156+
157+
public class SampleMessageHandler implements SqsMessageHandler<String> {
158+
159+
@Override
160+
public String process(SQSMessage message) {
161+
// This will be called for each individual message from a batch
162+
// It should raise an exception if the message was not processed successfully
163+
String returnVal = doSomething(message.getBody());
164+
return returnVal;
165+
}
166+
}
167+
}
168+
```
169+
170+
You can also use the utility in a more functional way` by providing inline implementation of functional interface <code>SqsMessageHandler#process()</code>
171+
172+
```java:title=App.java
173+
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {
174+
175+
@Override
176+
public List<String> handleRequest(SQSEvent input, Context context) {
177+
// highlight-start
178+
List<String> returnValues = PowertoolsSqs.batchProcessor(input, (message) -> {
179+
// This will be called for each individual message from a batch
180+
// It should raise an exception if the message was not processed successfully
181+
String returnVal = doSomething(message.getBody());
182+
return returnVal;
183+
});
184+
// highlight-end
185+
186+
return returnValues;
187+
}
188+
}
189+
```
190+
191+
## Passing custom SqsClient
192+
193+
If you need to pass custom SqsClient such as region to the SDK, you can pass your own `SqsClient` to be used by utility either for
194+
**[SqsBatchProcessor annotation](#SqsBatchProcessor annotation)**, or **[PowertoolsSqs Utility API](#PowertoolsSqs Utility API)**.
195+
196+
```java:title=App.java
197+
198+
public class AppSqsEvent implements RequestHandler<SQSEvent, List<String>> {
199+
// highlight-start
200+
static {
201+
PowertoolsSqs.overrideSqsClient(SqsClient.builder()
202+
.build());
203+
}
204+
// highlight-end
205+
206+
@Override
207+
public List<String> handleRequest(SQSEvent input, Context context) {
208+
List<String> returnValues = PowertoolsSqs.batchProcessor(input, SampleMessageHandler.class);
209+
210+
return returnValues;
211+
}
212+
213+
public class SampleMessageHandler implements SqsMessageHandler<String> {
214+
215+
@Override
216+
public String process(SQSMessage message) {
217+
// This will be called for each individual message from a batch
218+
// It should raise an exception if the message was not processed successfully
219+
String returnVal = doSomething(message.getBody());
220+
return returnVal;
221+
}
222+
}
223+
}
224+
225+
```
226+
227+
## Suppressing exceptions
228+
229+
If you want to disable the default behavior where `SQSBatchProcessingException` is raised if there are any exception, you can pass the `suppressException` boolean argument.
230+
231+
**Within SqsBatchProcessor annotation**
232+
233+
```java:title=App.java
234+
...
235+
@Override
236+
@SqsBatchProcessor(value = SampleMessageHandler.class, suppressException = true) // highlight-line
237+
public String handleRequest(SQSEvent input, Context context) {
238+
return "{\"statusCode\": 200}";
239+
}
240+
```
241+
242+
**Within PowertoolsSqs Utility API**
243+
244+
```java:title=App.java
245+
@Override
246+
public List<String> handleRequest(SQSEvent input, Context context) {
247+
List<String> returnValues = PowertoolsSqs.batchProcessor(input, true, SampleMessageHandler.class); // highlight-line
248+
249+
return returnValues;
250+
}
251+
```

docs/content/utilities/sqs_large_message_handling.mdx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ To install this utility, add the following dependency to your project.
3535

3636
And configure the aspectj-maven-plugin to compile-time weave (CTW) the
3737
aws-lambda-powertools-java aspects into your project. You may already have this
38-
plugin in your pom. In that case add the depenedency to the `aspectLibraries`
38+
plugin in your pom. In that case add the dependency to the `aspectLibraries`
3939
section.
4040

4141
```xml
@@ -51,12 +51,12 @@ section.
5151
<target>1.8</target>
5252
<complianceLevel>1.8</complianceLevel>
5353
<aspectLibraries>
54-
...
54+
<!-- highlight-start -->
5555
<aspectLibrary>
5656
<groupId>software.amazon.lambda</groupId>
5757
<artifactId>powertools-sqs</artifactId>
5858
</aspectLibrary>
59-
...
59+
<!-- highlight-end -->
6060
</aspectLibraries>
6161
</configuration>
6262
<executions>

docs/gatsby-config.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ module.exports = {
2929
],
3030
'Utilities': [
3131
'utilities/sqs_large_message_handling',
32-
'utilities/parameters'
32+
'utilities/batch',
33+
'utilities/parameters',
3334
],
3435
},
3536
navConfig: {

example/HelloWorldFunction/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@
3333
<artifactId>powertools-parameters</artifactId>
3434
<version>0.4.0-beta</version>
3535
</dependency>
36+
<dependency>
37+
<groupId>software.amazon.lambda</groupId>
38+
<artifactId>powertools-sqs</artifactId>
39+
<version>0.4.0-beta</version>
40+
</dependency>
3641
<dependency>
3742
<groupId>com.amazonaws</groupId>
3843
<artifactId>aws-lambda-java-core</artifactId>
@@ -90,6 +95,10 @@
9095
<groupId>software.amazon.lambda</groupId>
9196
<artifactId>powertools-metrics</artifactId>
9297
</aspectLibrary>
98+
<aspectLibrary>
99+
<groupId>software.amazon.lambda</groupId>
100+
<artifactId>powertools-sqs</artifactId>
101+
</aspectLibrary>
93102
</aspectLibraries>
94103
</configuration>
95104
<executions>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package helloworld;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
6+
import org.apache.logging.log4j.LogManager;
7+
import org.apache.logging.log4j.Logger;
8+
import software.amazon.lambda.powertools.logging.PowertoolsLogging;
9+
import software.amazon.lambda.powertools.sqs.SqsBatchProcessor;
10+
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;
11+
12+
import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
13+
14+
public class AppSqsEvent implements RequestHandler<SQSEvent, String> {
15+
private static final Logger LOG = LogManager.getLogger(AppSqsEvent.class);
16+
17+
@Override
18+
@SqsBatchProcessor(SampleMessageHandler.class)
19+
@PowertoolsLogging(logEvent = true)
20+
public String handleRequest(SQSEvent input, Context context) {
21+
return "{\"statusCode\": 200}";
22+
}
23+
24+
public class SampleMessageHandler implements SqsMessageHandler<Object> {
25+
26+
@Override
27+
public String process(SQSMessage message) {
28+
if("19dd0b57-b21e-4ac1-bd88-01bbb068cb99".equals(message.getMessageId())) {
29+
throw new RuntimeException(message.getMessageId());
30+
}
31+
LOG.info("Processing message with details {}", message);
32+
return message.getMessageId();
33+
}
34+
}
35+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package helloworld;
2+
3+
import java.util.List;
4+
5+
import com.amazonaws.services.lambda.runtime.Context;
6+
import com.amazonaws.services.lambda.runtime.RequestHandler;
7+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
10+
import software.amazon.lambda.powertools.sqs.PowertoolsSqs;
11+
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
12+
13+
import static java.util.Collections.emptyList;
14+
15+
public class AppSqsEventUtil implements RequestHandler<SQSEvent, List<String>> {
16+
private static final Logger LOG = LogManager.getLogger(AppSqsEventUtil.class);
17+
18+
@Override
19+
public List<String> handleRequest(SQSEvent input, Context context) {
20+
try {
21+
22+
return PowertoolsSqs.batchProcessor(input, (message) -> {
23+
if ("19dd0b57-b21e-4ac1-bd88-01bbb068cb99".equals(message.getMessageId())) {
24+
throw new RuntimeException(message.getMessageId());
25+
}
26+
27+
LOG.info("Processing message with details {}", message);
28+
return message.getMessageId();
29+
});
30+
31+
} catch (SQSBatchProcessingException e) {
32+
LOG.info("Exception details {}", e.getMessage(), e);
33+
LOG.info("Success message Returns{}", e.successMessageReturnValues());
34+
LOG.info("Failed messages {}", e.getFailures());
35+
LOG.info("Failed messages Reasons {}", e.getExceptions());
36+
return emptyList();
37+
}
38+
}
39+
}

0 commit comments

Comments
 (0)