Skip to content

feat: SQS Large message handling #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
env:
OS: ${{ matrix.os }}
JAVA: ${{ matrix.java-version }}
AWS_REGION: eu-west-1
steps:
- uses: actions/checkout@v2
- name: Setup java
Expand Down
8 changes: 0 additions & 8 deletions docs/content/dummy.md

This file was deleted.

105 changes: 105 additions & 0 deletions docs/content/utilities/sqs_large_message_handling.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
---
title: SQS Large Message Handling
description: Utility
---

The large message handling utility handles SQS messages which have had their payloads
offloaded to S3 due to them being larger than the SQS maximum.

The utility automatically retrieves messages which have been offloaded to S3 using the
[amazon-sqs-java-extended-client-lib](https://github.com/awslabs/amazon-sqs-java-extended-client-lib)
client library. Once the message payloads have been processed successful the
utility can delete the message payloads from S3.

This utility is compatible with versions *1.1.0+* of amazon-sqs-java-extended-client-lib.</p>

```xml
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-extended-client-lib</artifactId>
<version>1.1.0</version>
</dependency>
```

## Install

To install this utility, add the following dependency to your project.

```xml
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-sqs</artifactId>
<version>0.1.0-beta</version>
</dependency>
```

And configure the aspectj-maven-plugin to compile-time weave (CTW) the
aws-lambda-powertools-java aspects into your project. You may already have this
plugin in your pom. In that case add the depenedency to the `aspectLibraries`
section.

```xml
<build>
<plugins>
...
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>1.11</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<complianceLevel>1.8</complianceLevel>
<aspectLibraries>
...
<aspectLibrary>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-sqs</artifactId>
</aspectLibrary>
...
</aspectLibraries>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
...
</plugins>
</build>
```

The annotation `@LargeMessageHandler` should be used with the handleRequest method of a class
which implements `com.amazonaws.services.lambda.runtime.RequestHandler` with
`com.amazonaws.services.lambda.runtime.events.SQSEvent` as the first parameter.

```java
public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {

@Override
@LargeMessageHandler
public String handleRequest(SQSEvent sqsEvent, Context context) {
// process messages

return "ok";
}
```

`@LargeMessageHandler` creates a default S3 Client `AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient()`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be mention a bit explicitly that if download of any message fails from S3 while enriching, entire batch will fail?

When the Lambda function is invoked with an event from SQS, each record received
in the SQSEvent will be checked to see if it's body contains a payload which has
been offloaded to S3. If it does then `getObject(bucket, key)` will be called,
and the payload retrieved. If there is an error during this process then the
function will fail with a `FailedProcessingLargePayloadException` exception.

If the request handler method returns without error then each payload will be
deleted from S3 using `deleteObject(bucket, key)`

To disable deletion of payloads setting the following annotation parameter:

```java
@LargeMessageHandler(deletePayloads=false)
```
5 changes: 4 additions & 1 deletion docs/gatsby-config.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ module.exports = {
'Core utilities': [
'core/logging',
'core/tracing'
]
],
'Utilities': [
'utilities/sqs_large_message_handling'
],
},
navConfig: {
'Serverless Best Practices video': {
Expand Down
4 changes: 2 additions & 2 deletions example/HelloWorldFunction/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-tracing</artifactId>
<version>0.1.0-SNAPSHOT</version>
<version>0.1.0-beta</version>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-logging</artifactId>
<version>0.1.0-SNAPSHOT</version>
<version>0.1.0-beta</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down
25 changes: 23 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<module>powertools-core</module>
<module>powertools-logging</module>
<module>powertools-tracing</module>
<module>powertools-sqs</module>
</modules>

<scm>
Expand All @@ -50,9 +51,12 @@
<log4j.version>2.13.3</log4j.version>
<jackson.version>2.11.0</jackson.version>
<aspectj.version>1.9.6</aspectj.version>
<aws.sdk.version>2.14.4</aws.sdk.version>
<aws.xray.recorder.version>2.4.0</aws.xray.recorder.version>
<payloadoffloading-common.version>1.0.0</payloadoffloading-common.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<lambda.core.version>1.2.1</lambda.core.version>
<lambda.events.version>3.1.0</lambda.events.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<aspectj-maven-plugin.version>1.12.1</aspectj-maven-plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
Expand All @@ -62,6 +66,7 @@
<maven-javadoc-plugin.version>2.10.3</maven-javadoc-plugin.version>
<maven-source-plugin.version>2.4</maven-source-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<junit-jupiter.version>5.6.2</junit-jupiter.version>
</properties>

<distributionManagement>
Expand All @@ -83,6 +88,16 @@
<artifactId>aws-lambda-java-core</artifactId>
<version>${lambda.core.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>${lambda.events.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.payloadoffloading</groupId>
<artifactId>payloadoffloading-common</artifactId>
<version>${payloadoffloading-common.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
Expand Down Expand Up @@ -133,13 +148,19 @@
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.2</version>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.6.2</version>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
104 changes: 104 additions & 0 deletions powertools-sqs/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>powertools-sqs</artifactId>
<packaging>jar</packaging>

<parent>
<artifactId>powertools-parent</artifactId>
<groupId>software.amazon.lambda</groupId>
<version>0.1.0-beta</version>
</parent>

<name>AWS Lambda Powertools Java library SQS</name>
<description>
A suite of utilities for AWS Lambda Functions that makes tracing with AWS X-Ray, structured logging and creating custom metrics asynchronously easier.
</description>
<url>https://aws.amazon.com/lambda/</url>
<issueManagement>
<system>GitHub Issues</system>
<url>https://github.com/awslabs/aws-lambda-powertools-java/issues</url>
</issueManagement>
<scm>
<url>https://github.com/awslabs/aws-lambda-powertools-java.git</url>
</scm>
<developers>
<developer>
<name>AWS Lambda Powertools team</name>
<organization>Amazon Web Services</organization>
<organizationUrl>https://aws.amazon.com/</organizationUrl>
</developer>
</developers>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://aws.oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>

<dependencies>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.payloadoffloading</groupId>
<artifactId>payloadoffloading-common</artifactId>
</dependency>

<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package software.amazon.lambda.powertools.sqs;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* {@code LargeMessageHandler} is used to signal that the annotated method
* should be extended to handle large SQS messages which have been offloaded
* to S3
*
* <p>{@code LargeMessageHandler} automatically retrieves and deletes messages
* which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib}
* client library.</p>
*
* <p>This version of the {@code LargeMessageHandler} is compatible with version
* 1.1.0+ of {@code amazon-sqs-java-extended-client-lib}.</p>
*
* <pre>
* &lt;dependency&gt;
* &lt;groupId&gt;com.amazonaws&lt;/groupId&gt;
* &lt;artifactId&gt;amazon-sqs-java-extended-client-lib&lt;/artifactId&gt;
* &lt;version&gt;1.1.0&lt;/version&gt;
* &lt;/dependency&gt;
* </pre>
*
* <p>{@code LargeMessageHandler} should be used with the handleRequest method of a class
* which implements {@code com.amazonaws.services.lambda.runtime.RequestHandler} with
* {@code com.amazonaws.services.lambda.runtime.events.SQSEvent} as the first parameter.</p>
*
* <pre>
* public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {
*
* {@literal @}Override
* {@literal @}LargeMessageHandler
* public String handleRequest(SQSEvent sqsEvent, Context context) {
*
* // process messages
*
* return "ok";
* }
*
* ...
* </pre>
*
* <p>Using the default S3 Client {@code AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();}
* each record received in the SQSEvent {@code LargeMessageHandler} will checked
* to see if it's body contains a payload which has been offloaded to S3. If it
* does then {@code getObject(bucket, key)} will be called and the payload
* retrieved.</p>
*
* <p><b>Note</b>: Retreiving payloads from S3 will increase the duration of the
* Lambda function.</p>
*
* <p>If the request handler method returns then each payload will be deleted
* from S3 using {@code deleteObject(bucket, key)}</p>
*
* <p>To disable deletion of payloads setting the following annotation parameter
* {@code @LargeMessageHandler(deletePayloads=false)}</p>
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LargeMessageHandler {

boolean deletePayloads() default true;
}
Loading