2
2
3
3
import java .io .IOException ;
4
4
import java .util .List ;
5
- import java .util .stream .Stream ;
6
5
7
- import com .amazonaws .AmazonServiceException ;
8
- import com .amazonaws .SdkClientException ;
9
6
import com .amazonaws .services .lambda .runtime .events .SQSEvent ;
10
7
import com .fasterxml .jackson .databind .ObjectMapper ;
11
8
import org .junit .jupiter .api .BeforeEach ;
12
9
import org .junit .jupiter .api .Test ;
13
10
import org .junit .jupiter .params .ParameterizedTest ;
14
- import org .junit .jupiter .params .provider .Arguments ;
15
11
import org .junit .jupiter .params .provider .ValueSource ;
16
12
import software .amazon .awssdk .services .sqs .SqsClient ;
13
+ import software .amazon .awssdk .services .sqs .model .DeleteMessageBatchRequest ;
14
+ import software .amazon .awssdk .services .sqs .model .GetQueueUrlRequest ;
15
+ import software .amazon .awssdk .services .sqs .model .GetQueueUrlResponse ;
17
16
18
17
import static com .amazonaws .services .lambda .runtime .events .SQSEvent .SQSMessage ;
19
18
import static org .assertj .core .api .Assertions .assertThat ;
19
+ import static org .assertj .core .api .Assertions .assertThatExceptionOfType ;
20
+ import static org .mockito .ArgumentMatchers .any ;
20
21
import static org .mockito .Mockito .mock ;
22
+ import static org .mockito .Mockito .reset ;
21
23
import static org .mockito .Mockito .times ;
22
24
import static org .mockito .Mockito .verify ;
23
25
import static org .mockito .Mockito .verifyNoInteractions ;
26
+ import static org .mockito .Mockito .when ;
27
+ import static software .amazon .lambda .powertools .sqs .PowertoolsSqs .defaultSqsClient ;
28
+ import static software .amazon .lambda .powertools .sqs .PowertoolsSqs .partialBatchProcessor ;
24
29
25
30
class PowertoolsSqsBatchProcessorTest {
26
31
27
- private final SqsClient sqsClient = mock (SqsClient .class );
28
- private final SqsClient interactionClient = mock (SqsClient .class );
32
+ private static final SqsClient sqsClient = mock (SqsClient .class );
33
+ private static final SqsClient interactionClient = mock (SqsClient .class );
29
34
private static final ObjectMapper MAPPER = new ObjectMapper ();
30
35
private SQSEvent event ;
31
36
32
37
@ BeforeEach
33
38
void setUp () throws IOException {
39
+ reset (sqsClient , interactionClient );
34
40
event = MAPPER .readValue (this .getClass ().getResource ("/sampleSqsBatchEvent.json" ), SQSEvent .class );
35
- PowertoolsSqs .defaultSqsClient (sqsClient );
41
+
42
+ when (sqsClient .getQueueUrl (any (GetQueueUrlRequest .class ))).thenReturn (GetQueueUrlResponse .builder ()
43
+ .queueUrl ("test" )
44
+ .build ());
45
+
46
+ defaultSqsClient (sqsClient );
36
47
}
37
48
38
49
@ Test
39
50
void shouldBatchProcessAndNotDeleteMessagesWhenAllSuccess () {
40
- List <String > returnValues = PowertoolsSqs . partialBatchProcessor (event , false , (message ) -> {
51
+ List <String > returnValues = partialBatchProcessor (event , false , (message ) -> {
41
52
interactionClient .listQueues ();
42
53
return "Success" ;
43
54
});
@@ -53,24 +64,159 @@ void shouldBatchProcessAndNotDeleteMessagesWhenAllSuccess() {
53
64
@ ParameterizedTest
54
65
@ ValueSource (classes = {SampleInnerSqsHandler .class , SampleSqsHandler .class })
55
66
void shouldBatchProcessViaClassAndNotDeleteMessagesWhenAllSuccess (Class <? extends SqsMessageHandler <String >> handler ) {
56
- List <String > returnValues = PowertoolsSqs . partialBatchProcessor (event , false , handler );
67
+ List <String > returnValues = partialBatchProcessor (event , handler );
57
68
58
69
assertThat (returnValues )
59
70
.hasSize (2 )
60
71
.containsExactly ("0" , "1" );
72
+
73
+ verifyNoInteractions (sqsClient );
74
+ }
75
+
76
+ @ Test
77
+ void shouldBatchProcessAndDeleteSuccessMessageOnPartialFailures () {
78
+ String failedId = "2e1424d4-f796-459a-8184-9c92662be6da" ;
79
+
80
+ SqsMessageHandler <String > failedHandler = (message ) -> {
81
+ if (failedId .equals (message .getMessageId ())) {
82
+ throw new RuntimeException ("Failed processing" );
83
+ }
84
+
85
+ interactionClient .listQueues ();
86
+ return "Success" ;
87
+ };
88
+
89
+ assertThatExceptionOfType (SQSBatchProcessingException .class )
90
+ .isThrownBy (() -> partialBatchProcessor (event , failedHandler ))
91
+ .satisfies (e -> {
92
+
93
+ assertThat (e .successMessageReturnValues ())
94
+ .hasSize (1 )
95
+ .contains ("Success" );
96
+
97
+ assertThat (e .getFailures ())
98
+ .hasSize (1 )
99
+ .extracting ("messageId" )
100
+ .contains (failedId );
101
+
102
+ assertThat (e .getExceptions ())
103
+ .hasSize (1 )
104
+ .extracting ("detailMessage" )
105
+ .contains ("Failed processing" );
106
+ });
107
+
108
+ verify (interactionClient ).listQueues ();
109
+ verify (sqsClient ).deleteMessageBatch (any (DeleteMessageBatchRequest .class ));
110
+ }
111
+
112
+ @ Test
113
+ void shouldBatchProcessAndFullFailuresInBatch () {
114
+ SqsMessageHandler <String > failedHandler = (message ) -> {
115
+ throw new RuntimeException (message .getMessageId ());
116
+ };
117
+
118
+ assertThatExceptionOfType (SQSBatchProcessingException .class )
119
+ .isThrownBy (() -> partialBatchProcessor (event , failedHandler ))
120
+ .satisfies (e -> {
121
+
122
+ assertThat (e .successMessageReturnValues ())
123
+ .isEmpty ();
124
+
125
+ assertThat (e .getFailures ())
126
+ .hasSize (2 )
127
+ .extracting ("messageId" )
128
+ .containsExactly ("059f36b4-87a3-44ab-83d2-661975830a7d" ,
129
+ "2e1424d4-f796-459a-8184-9c92662be6da" );
130
+
131
+ assertThat (e .getExceptions ())
132
+ .hasSize (2 )
133
+ .extracting ("detailMessage" )
134
+ .containsExactly ("059f36b4-87a3-44ab-83d2-661975830a7d" ,
135
+ "2e1424d4-f796-459a-8184-9c92662be6da" );
136
+ });
137
+
138
+ verifyNoInteractions (sqsClient );
139
+ }
140
+
141
+ @ Test
142
+ void shouldBatchProcessViaClassAndDeleteSuccessMessageOnPartialFailures () {
143
+ assertThatExceptionOfType (SQSBatchProcessingException .class )
144
+ .isThrownBy (() -> partialBatchProcessor (event , FailureSampleInnerSqsHandler .class ))
145
+ .satisfies (e -> {
146
+
147
+ assertThat (e .successMessageReturnValues ())
148
+ .hasSize (1 )
149
+ .contains ("Success" );
150
+
151
+ assertThat (e .getFailures ())
152
+ .hasSize (1 )
153
+ .extracting ("messageId" )
154
+ .contains ("2e1424d4-f796-459a-8184-9c92662be6da" );
155
+
156
+ assertThat (e .getExceptions ())
157
+ .hasSize (1 )
158
+ .extracting ("detailMessage" )
159
+ .contains ("Failed processing" );
160
+ });
161
+
162
+ verify (sqsClient ).deleteMessageBatch (any (DeleteMessageBatchRequest .class ));
163
+ }
164
+
165
+
166
+ @ Test
167
+ void shouldBatchProcessAndSuppressExceptions () {
168
+ String failedId = "2e1424d4-f796-459a-8184-9c92662be6da" ;
169
+
170
+ SqsMessageHandler <String > failedHandler = (message ) -> {
171
+ if (failedId .equals (message .getMessageId ())) {
172
+ throw new RuntimeException ("Failed processing" );
173
+ }
174
+
175
+ interactionClient .listQueues ();
176
+ return "Success" ;
177
+ };
178
+
179
+ List <String > returnValues = partialBatchProcessor (event , true , failedHandler );
180
+
181
+ assertThat (returnValues )
182
+ .hasSize (1 )
183
+ .contains ("Success" );
184
+
185
+ verify (interactionClient ).listQueues ();
186
+ verify (sqsClient ).deleteMessageBatch (any (DeleteMessageBatchRequest .class ));
61
187
}
62
188
63
- private static Stream <Arguments > exception () {
64
- return Stream .of (Arguments .of (new AmazonServiceException ("Service Exception" )),
65
- Arguments .of (new SdkClientException ("Client Exception" )));
189
+ @ Test
190
+ void shouldBatchProcessViaClassAndSuppressExceptions () {
191
+ List <String > returnValues = partialBatchProcessor (event , true , FailureSampleInnerSqsHandler .class );
192
+
193
+ assertThat (returnValues )
194
+ .hasSize (1 )
195
+ .contains ("Success" );
196
+
197
+ verify (interactionClient ).listQueues ();
198
+ verify (sqsClient ).deleteMessageBatch (any (DeleteMessageBatchRequest .class ));
66
199
}
67
200
68
201
public class SampleInnerSqsHandler implements SqsMessageHandler <String > {
69
202
private int counter ;
70
203
71
204
@ Override
72
205
public String process (SQSMessage message ) {
206
+ interactionClient .listQueues ();
73
207
return String .valueOf (counter ++);
74
208
}
75
209
}
210
+
211
+ public class FailureSampleInnerSqsHandler implements SqsMessageHandler <String > {
212
+ @ Override
213
+ public String process (SQSEvent .SQSMessage message ) {
214
+ if ("2e1424d4-f796-459a-8184-9c92662be6da" .equals (message .getMessageId ())) {
215
+ throw new RuntimeException ("Failed processing" );
216
+ }
217
+
218
+ interactionClient .listQueues ();
219
+ return "Success" ;
220
+ }
221
+ }
76
222
}
0 commit comments