Skip to content

Commit 03e7f0f

Browse files
committed
INT-3529 Add 'expire-groups-upon-timeout' to Reseq
JIRA: https://jira.spring.io/browse/INT-3529 Late arriving messages will now be immediately discarded by default.
1 parent 113716e commit 03e7f0f

File tree

11 files changed

+132
-31
lines changed

11 files changed

+132
-31
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -263,13 +263,11 @@ public void setReleasePartialSequences(boolean releasePartialSequences) {
263263

264264
/**
265265
* Expire (completely remove) a group if it is completed due to timeout.
266-
* Subclasses setting this to false MUST handle null in the messages
267-
* argument to {@link #afterRelease(MessageGroup, Collection)}.
268-
* Default true.
269-
* @param expireGroupsUponTimeout the expireGroupsOnTimeout to set
266+
* Default true
267+
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set
270268
* @since 4.1
271269
*/
272-
protected void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
270+
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
273271
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
274272
}
275273

@@ -507,8 +505,9 @@ protected void forceComplete(MessageGroup group) {
507505
expireGroup(correlationKey, groupNow);
508506
}
509507
if (!this.expireGroupsUponTimeout) {
510-
afterRelease(groupNow, null);
508+
afterRelease(groupNow, groupNow.getMessages());
511509
removeGroup = false;
510+
this.messageStore.completeGroup(correlationKey); // late messages immediately discarded
512511
}
513512
}
514513
else {

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,6 @@ public void setExpireGroupsUponCompletion(boolean expireGroupsUponCompletion) {
5656
this.expireGroupsUponCompletion = expireGroupsUponCompletion;
5757
}
5858

59-
@Override
60-
public void setExpireGroupsUponTimeout(boolean expireGroupsOnTimeout) {
61-
super.setExpireGroupsUponTimeout(expireGroupsOnTimeout);
62-
}
63-
6459
@Override
6560
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
6661
this.messageStore.completeGroup(messageGroup.getGroupId());

spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515

1616
import java.util.Collection;
1717

18-
import org.springframework.messaging.Message;
1918
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2019
import org.springframework.integration.store.MessageGroup;
2120
import org.springframework.integration.store.MessageGroupStore;
21+
import org.springframework.messaging.Message;
2222

2323
/**
2424
* Resequencer specific implementation of {@link AbstractCorrelatingMessageHandler}.
@@ -33,19 +33,34 @@ public ResequencingMessageHandler(MessageGroupProcessor processor,
3333
MessageGroupStore store, CorrelationStrategy correlationStrategy,
3434
ReleaseStrategy releaseStrategy) {
3535
super(processor, store, correlationStrategy, releaseStrategy);
36+
this.setExpireGroupsUponTimeout(false);
3637
}
3738

3839

3940
public ResequencingMessageHandler(MessageGroupProcessor processor,
4041
MessageGroupStore store) {
4142
super(processor, store);
43+
this.setExpireGroupsUponTimeout(false);
4244
}
4345

4446

4547
public ResequencingMessageHandler(MessageGroupProcessor processor) {
4648
super(processor);
49+
this.setExpireGroupsUponTimeout(false);
50+
}
51+
52+
/**
53+
* {@inheritDoc}
54+
*
55+
* (overridden to false for a resequencer so late messages are immediately discarded rather
56+
* than waiting for the next timeout)
57+
*/
58+
@Override
59+
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
60+
super.setExpireGroupsUponTimeout(expireGroupsUponTimeout);
4761
}
4862

63+
4964
@Override
5065
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
5166

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public abstract class AbstractCorrelatingMessageHandlerParser extends AbstractCo
5858

5959
private static final String SEND_PARTIAL_RESULT_ON_EXPIRY_ATTRIBUTE = "send-partial-result-on-expiry";
6060

61+
private static final String EXPIRE_GROUPS_UPON_TIMEOUT = "expire-groups-upon-timeout";
62+
6163
protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetadataElement processor, ParserContext parserContext){
6264
this.injectPropertyWithAdapter(CORRELATION_STRATEGY_REF_ATTRIBUTE, CORRELATION_STRATEGY_METHOD_ATTRIBUTE,
6365
CORRELATION_STRATEGY_EXPRESSION_ATTRIBUTE, CORRELATION_STRATEGY_PROPERTY, "CorrelationStrategy",
@@ -78,6 +80,7 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad
7880
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("group-timeout", "group-timeout-expression",
7981
parserContext, element, false);
8082
builder.addPropertyValue("groupTimeoutExpression", expressionDef);
83+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT);
8184
}
8285

8386
protected void injectPropertyWithAdapter(String beanRefAttribute, String methodRefAttribute,

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ public class AggregatorParser extends AbstractCorrelatingMessageHandlerParser {
4444

4545
private static final String EXPIRE_GROUPS_UPON_COMPLETION = "expire-groups-upon-completion";
4646

47-
private static final String EXPIRE_GROUPS_UPON_TIMEOUT = "expire-groups-upon-timeout";
48-
4947
@Override
5048
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
5149
BeanComponentDefinition innerHandlerDefinition = IntegrationNamespaceUtils.parseInnerHandlerDefinition(element,
@@ -90,7 +88,6 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
9088
this.doParse(builder, element, processor, parserContext);
9189

9290
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_COMPLETION);
93-
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT);
9491

9592
return builder;
9693
}

spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3418,16 +3418,6 @@
34183418
</xsd:documentation>
34193419
</xsd:annotation>
34203420
</xsd:attribute>
3421-
<xsd:attribute name="expire-groups-upon-timeout" type="xsd:string">
3422-
<xsd:annotation>
3423-
<xsd:documentation>
3424-
Boolean flag specifying, if a group is completed due to timeout (reaper or
3425-
'group-timeout(-expression)'), whether the group should be removed.
3426-
When true, late arriving messages will form a new group. When false, they
3427-
will be discarded. Default is 'true'
3428-
</xsd:documentation>
3429-
</xsd:annotation>
3430-
</xsd:attribute>
34313421
</xsd:extension>
34323422
</xsd:complexContent>
34333423
</xsd:complexType>
@@ -3547,6 +3537,17 @@
35473537
</xsd:documentation>
35483538
</xsd:annotation>
35493539
</xsd:attribute>
3540+
<xsd:attribute name="expire-groups-upon-timeout" type="xsd:string">
3541+
<xsd:annotation>
3542+
<xsd:documentation>
3543+
Boolean flag specifying, if a group is completed due to timeout (reaper or
3544+
'group-timeout(-expression)'), whether the group should be removed.
3545+
When true, late arriving messages will form a new group. When false, they
3546+
will be discarded. Default is 'true' for an aggregator and 'false' for a
3547+
resequencer.
3548+
</xsd:documentation>
3549+
</xsd:annotation>
3550+
</xsd:attribute>
35503551
<xsd:attribute name="lock-registry" type="xsd:string">
35513552
<xsd:annotation>
35523553
<xsd:appinfo>

spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencerTests.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@
3131
import org.junit.Test;
3232

3333
import org.springframework.beans.factory.BeanFactory;
34+
import org.springframework.expression.spel.standard.SpelExpressionParser;
35+
import org.springframework.expression.spel.support.StandardEvaluationContext;
3436
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3537
import org.springframework.integration.channel.QueueChannel;
3638
import org.springframework.integration.store.MessageGroupStore;
3739
import org.springframework.integration.store.SimpleMessageStore;
3840
import org.springframework.integration.support.MessageBuilder;
3941
import org.springframework.messaging.Message;
4042
import org.springframework.messaging.MessageChannel;
43+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
4144

4245
/**
4346
* @author Marius Bogoevici
@@ -243,10 +246,10 @@ public void testResequencingWithDiscard() throws InterruptedException {
243246
new IntegrationMessageHeaderAccessor(reply2).getSequenceNumber()));
244247
Collections.sort(sequence);
245248
assertEquals("[1, 2]", sequence.toString());
246-
// when sending the last message, the whole sequence must have been sent
249+
// Once a group is expired, late messages are discarded immediately by default
247250
this.resequencer.handleMessage(message3);
248251
reply3 = discardChannel.receive(0);
249-
assertNull(reply3);
252+
assertNotNull(reply3);
250253
}
251254

252255
@Test
@@ -322,6 +325,63 @@ public void testRemovalOfBarrierWhenLastMessageOfSequenceArrives() {
322325
assertEquals(0, store.getMessageGroup(correlationId).size());
323326
}
324327

328+
@Test
329+
public void testTimeoutDefaultExpiry() throws InterruptedException {
330+
this.resequencer.setGroupTimeoutExpression(new SpelExpressionParser().parseExpression("100"));
331+
this.resequencer.setIntegrationEvaluationContext(new StandardEvaluationContext());
332+
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
333+
taskScheduler.afterPropertiesSet();
334+
this.resequencer.setTaskScheduler(taskScheduler);
335+
QueueChannel discardChannel = new QueueChannel();
336+
this.resequencer.setDiscardChannel(discardChannel);
337+
QueueChannel replyChannel = new QueueChannel();
338+
this.resequencer.setOutputChannel(replyChannel);
339+
Message<?> message3 = createMessage("789", "ABC", 3, 3, null);
340+
Message<?> message2 = createMessage("456", "ABC", 3, 2, null);
341+
this.resequencer.handleMessage(message3);
342+
this.resequencer.handleMessage(message2);
343+
Message<?> out1 = replyChannel.receive(0);
344+
assertNull(out1);
345+
out1 = discardChannel.receive(1000);
346+
assertNotNull(out1);
347+
Message<?> out2 = discardChannel.receive(0);
348+
assertNotNull(out2);
349+
Message<?> message1 = createMessage("123", "ABC", 3, 1, null);
350+
this.resequencer.handleMessage(message1);
351+
Message<?> out3 = discardChannel.receive(0);
352+
assertNotNull(out3);
353+
}
354+
355+
@Test
356+
public void testTimeoutDontExpire() throws InterruptedException {
357+
this.resequencer.setGroupTimeoutExpression(new SpelExpressionParser().parseExpression("100"));
358+
this.resequencer.setIntegrationEvaluationContext(new StandardEvaluationContext());
359+
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
360+
taskScheduler.afterPropertiesSet();
361+
this.resequencer.setTaskScheduler(taskScheduler);
362+
QueueChannel discardChannel = new QueueChannel();
363+
this.resequencer.setDiscardChannel(discardChannel);
364+
QueueChannel replyChannel = new QueueChannel();
365+
this.resequencer.setOutputChannel(replyChannel);
366+
this.resequencer.setExpireGroupsUponTimeout(true);
367+
Message<?> message3 = createMessage("789", "ABC", 3, 3, null);
368+
Message<?> message2 = createMessage("456", "ABC", 3, 2, null);
369+
this.resequencer.handleMessage(message3);
370+
this.resequencer.handleMessage(message2);
371+
Message<?> out1 = replyChannel.receive(0);
372+
assertNull(out1);
373+
out1 = discardChannel.receive(1000);
374+
assertNotNull(out1);
375+
Message<?> out2 = discardChannel.receive(0);
376+
assertNotNull(out2);
377+
Message<?> message1 = createMessage("123", "ABC", 3, 1, null);
378+
this.resequencer.handleMessage(message1);
379+
Message<?> out3 = discardChannel.receive(0);
380+
assertNull(out3);
381+
out3 = discardChannel.receive(1000);
382+
assertNotNull(out3);
383+
}
384+
325385
private static Message<?> createMessage(String payload, Object correlationId, int sequenceSize, int sequenceNumber,
326386
MessageChannel replyChannel) {
327387
return MessageBuilder.withPayload(payload).setCorrelationId(correlationId).setSequenceSize(sequenceSize)

spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
55
* the License. You may obtain a copy of the License at
@@ -14,6 +14,7 @@
1414
package org.springframework.integration.config;
1515

1616
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.assertFalse;
1718
import static org.junit.Assert.assertNull;
1819
import static org.junit.Assert.assertTrue;
1920
import static org.springframework.integration.test.util.TestUtils.getPropertyValue;
@@ -22,10 +23,9 @@
2223

2324
import org.junit.Before;
2425
import org.junit.Test;
26+
2527
import org.springframework.context.ApplicationContext;
2628
import org.springframework.context.support.ClassPathXmlApplicationContext;
27-
import org.springframework.messaging.Message;
28-
import org.springframework.messaging.MessageChannel;
2929
import org.springframework.integration.aggregator.CorrelationStrategy;
3030
import org.springframework.integration.aggregator.MethodInvokingCorrelationStrategy;
3131
import org.springframework.integration.aggregator.MethodInvokingReleaseStrategy;
@@ -37,6 +37,8 @@
3737
import org.springframework.integration.store.SimpleMessageGroup;
3838
import org.springframework.integration.support.MessageBuilder;
3939
import org.springframework.integration.test.util.TestUtils;
40+
import org.springframework.messaging.Message;
41+
import org.springframework.messaging.MessageChannel;
4042

4143
/**
4244
* @author Marius Bogoevici
@@ -45,6 +47,7 @@
4547
* @author Oleg Zhurakousky
4648
* @author Stefan Ferstl
4749
* @author Artem Bilan
50+
* @author Gary Russell
4851
*/
4952
public class ResequencerParserTests {
5053

@@ -108,6 +111,7 @@ public void testReleaseStrategyRefOnly() throws Exception {
108111
ResequencingMessageHandler resequencer = getPropertyValue(endpoint, "handler", ResequencingMessageHandler.class);
109112
assertEquals("The ResequencerEndpoint is not configured with the appropriate ReleaseStrategy",
110113
context.getBean("testReleaseStrategy"), getPropertyValue(resequencer, "releaseStrategy"));
114+
assertFalse(TestUtils.getPropertyValue(resequencer, "expireGroupsUponTimeout", Boolean.class));
111115
}
112116

113117
@Test
@@ -128,6 +132,7 @@ public void testReleaseStrategyRefAndMethod() throws Exception {
128132
effectiveReleaseStrategy.canRelease(new SimpleMessageGroup("test"));
129133
assertEquals("The ResequencerEndpoint was not invoked the expected number of times;",
130134
currentInvocationCount + 1, expectedReleaseStrategy.invocationCount);
135+
assertTrue(TestUtils.getPropertyValue(resequencer, "expireGroupsUponTimeout", Boolean.class));
131136
}
132137

133138
@Test
@@ -161,6 +166,7 @@ private static <T> Message<T> createMessage(T payload, Object correlationId, int
161166

162167
static class TestCorrelationStrategy implements CorrelationStrategy {
163168

169+
@Override
164170
public Object getCorrelationKey(Message<?> message) {
165171
return "test";
166172
}
@@ -174,6 +180,7 @@ public Object foo(Object o) {
174180
}
175181

176182
static class TestReleaseStrategy implements ReleaseStrategy {
183+
@Override
177184
public boolean canRelease(MessageGroup group) {
178185
return true;
179186
}

spring-integration-core/src/test/java/org/springframework/integration/config/resequencerParserTests.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
<resequencer id="resequencerWithReleaseStrategyRefAndMethod"
5555
input-channel="inputChannel6"
56+
expire-groups-upon-timeout="true"
5657
release-strategy="testReleaseStrategyPojo"
5758
release-strategy-method="bar"/>
5859

src/reference/docbook/resequencer.xml

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@
5858
5959
group-timeout="60000" ]]><co id="resxml19" /><![CDATA[
6060
group-timeout-expression="size() ge 2 ? 100 : -1" ]]><co id="resxml20" /><![CDATA[
61-
scheduler="taskScheduler" /> ]]><co id="resxml21" /></programlisting>
61+
scheduler="taskScheduler" /> ]]><co id="resxml21" /><![CDATA[
62+
expire-group-upon-timeout="false" /> ]]><co id="resxml22" /></programlisting>
6263
<para><calloutlist>
6364
<callout arearefs="resxml1-co" id="resxml1">
6465
<para>The id of the resequencer is
@@ -193,6 +194,19 @@
193194
See <xref linkend="aggregator-xml"/>.
194195
</para>
195196
</callout>
197+
<callout arearefs="aggxml22" id="aggxml22-txt">
198+
<para>
199+
When a group is completed due to a timeout (or by a <classname>MessageGroupStoreReaper</classname>), the
200+
empty group's metadata
201+
is retained by default. Late arriving messages will be immediately discarded. Set this
202+
to <code>true</code> to remove the group completely; then, late arriving messages will start a new group
203+
and won't be discarded until the group again times out. The new group will never be released normally
204+
because of the "hole" in the sequence range that caused the timeout.
205+
Empty groups can be expired (completely removed) later using a
206+
<classname>MessageGroupStoreReaper</classname> together with the <code>empty-group-min-timeout</code>
207+
attribute. Default: 'false'.
208+
</para>
209+
</callout>
196210
</calloutlist></para>
197211

198212
<note>

src/reference/docbook/whats-new.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,5 +203,14 @@
203203
See <xref linkend="async-gateway"/>.
204204
</para>
205205
</section>
206+
<section id="4.1-reseq">
207+
<title>Resequencer Changes</title>
208+
<para>
209+
When a message group in a resequencer is timed out (using <code>group-timeout</code> or a
210+
<classname>MessageGroupStoreReaper</classname>), late arriving messages will now be discarded
211+
immediately by default.
212+
See <xref linkend="resequencer"/>.
213+
</para>
214+
</section>
206215
</section>
207216
</chapter>

0 commit comments

Comments
 (0)