From 6e4b55e2381bd80e4fd73eb1edc5f71fb228d40a Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 21 Jul 2014 13:48:44 -0400 Subject: [PATCH 1/3] INT-3420 Aggregator expire-groups-upon-timeout JIRA: https://jira.spring.io/browse/INT-3420 Add option to allow the empty group to remain after timeout so late arriving messages can be discarded. --- .../AbstractCorrelatingMessageHandler.java | 24 +++++++++-- .../aggregator/AggregatingMessageHandler.java | 5 +++ .../config/xml/AggregatorParser.java | 9 +++- .../config/xml/spring-integration-4.1.xsd | 10 +++++ .../aggregator/AggregatorTests.java | 42 +++++++++++++++++-- .../config/AggregatorParserTests.java | 11 +++-- .../config/aggregatorParserTests.xml | 3 +- src/reference/docbook/aggregator.xml | 14 ++++++- src/reference/docbook/whats-new.xml | 7 ++++ 9 files changed, 111 insertions(+), 14 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java index fb002fe7353..4d29f6a09f6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java @@ -128,6 +128,8 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageH private volatile ApplicationEventPublisher applicationEventPublisher; + private volatile boolean expireGroupsUponTimeout = true; + public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) { Assert.notNull(processor); @@ -300,6 +302,18 @@ public void setReleasePartialSequences(boolean releasePartialSequences) { this.releasePartialSequences = releasePartialSequences; } + /** + * Expire (completely remove) a group if it is completed due to timeout. + * Subclasses setting this to false MUST handle null in the messages + * argument to {@link #afterRelease(MessageGroup, Collection)}. + * Default true. + * @param expireGroupsUponTimeout the expireGroupsOnTimeout to set + * @since 4.1 + */ + protected void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) { + this.expireGroupsUponTimeout = expireGroupsUponTimeout; + } + @Override public String getComponentType() { return "aggregator"; @@ -462,7 +476,7 @@ public void run() { */ protected abstract void afterRelease(MessageGroup group, Collection> completedMessages); - private void forceComplete(MessageGroup group) { + protected void forceComplete(MessageGroup group) { Object correlationKey = group.getGroupId(); // UUIDConverter is no-op if already converted @@ -505,10 +519,14 @@ private void forceComplete(MessageGroup group) { && group.getTimestamp() == groupNow.getTimestamp()) { if (groupSize > 0) { if (releaseStrategy.canRelease(groupNow)) { - this.completeGroup(correlationKey, groupNow); + completeGroup(correlationKey, groupNow); } else { - this.expireGroup(correlationKey, groupNow); + expireGroup(correlationKey, groupNow); + } + if (!this.expireGroupsUponTimeout) { + afterRelease(groupNow, null); + removeGroup = false; } } else { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java index 130320ec542..cd90ea0f585 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java @@ -56,6 +56,11 @@ public void setExpireGroupsUponCompletion(boolean expireGroupsUponCompletion) { this.expireGroupsUponCompletion = expireGroupsUponCompletion; } + @Override + public void setExpireGroupsUponTimeout(boolean expireGroupsOnTimeout) { + super.setExpireGroupsUponTimeout(expireGroupsOnTimeout); + } + @Override protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages) { this.messageStore.completeGroup(messageGroup.getGroupId()); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java index a473829813f..418c1e2fab3 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.config.xml; +import org.w3c.dom.Element; + import org.springframework.beans.BeanMetadataElement; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.parsing.BeanComponentDefinition; @@ -26,7 +28,6 @@ import org.springframework.integration.aggregator.ExpressionEvaluatingMessageGroupProcessor; import org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor; import org.springframework.util.StringUtils; -import org.w3c.dom.Element; /** * Parser for the aggregator element of the integration namespace. Registers the annotation-driven @@ -37,11 +38,14 @@ * @author Oleg Zhurakousky * @author Dave Syer * @author Stefan Ferstl + * @author Gary Russell */ public class AggregatorParser extends AbstractCorrelatingMessageHandlerParser { private static final String EXPIRE_GROUPS_UPON_COMPLETION = "expire-groups-upon-completion"; + private static final String EXPIRE_GROUPS_UPON_TIMEOUT = "expire-groups-upon-timeout"; + @Override protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) { BeanComponentDefinition innerHandlerDefinition = IntegrationNamespaceUtils.parseInnerHandlerDefinition(element, @@ -86,6 +90,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars this.doParse(builder, element, processor, parserContext); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_COMPLETION); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT); return builder; } diff --git a/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd b/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd index 73f8e537c38..3c81d9806bc 100644 --- a/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd +++ b/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd @@ -3376,6 +3376,16 @@ + + + + Boolean flag specifying, if a group is completed due to timeout (reaper or + 'group-timeout(-expression)'), whether the group should be removed. + When true, late arriving messages will form a new group. When false, they + will be discarded. Default is 'true' + + + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java index cd87e3cba25..79c007e295a 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java @@ -13,9 +13,13 @@ package org.springframework.integration.aggregator; -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; import java.util.ArrayList; import java.util.Collection; @@ -237,6 +241,38 @@ public void testShouldSendPartialResultOnTimeoutTrue() throws InterruptedExcepti assertEquals("ABC", this.expiryEvents.get(0).getGroupId()); assertEquals(2, this.expiryEvents.get(0).getMessageCount()); assertEquals(false, this.expiryEvents.get(0).isDiscarded()); + Message message3 = createMessage(5, "ABC", 3, 3, replyChannel, null); + this.aggregator.handleMessage(message3); + assertEquals(1, this.store.getMessageGroup("ABC").size()); + } + + @Test + public void testGroupRemainsAfterTimeout() throws InterruptedException { + this.aggregator.setSendPartialResultOnExpiry(true); + this.aggregator.setExpireGroupsUponTimeout(false); + QueueChannel replyChannel = new QueueChannel(); + QueueChannel discardChannel = new QueueChannel(); + this.aggregator.setDiscardChannel(discardChannel); + Message message1 = createMessage(3, "ABC", 3, 1, replyChannel, null); + Message message2 = createMessage(5, "ABC", 3, 2, replyChannel, null); + this.aggregator.handleMessage(message1); + this.aggregator.handleMessage(message2); + this.store.expireMessageGroups(-10000); + Message reply = replyChannel.receive(1000); + assertNotNull("A reply message should have been received", reply); + assertEquals(15, reply.getPayload()); + assertEquals(1, expiryEvents.size()); + assertSame(this.aggregator, expiryEvents.get(0).getSource()); + assertEquals("ABC", this.expiryEvents.get(0).getGroupId()); + assertEquals(2, this.expiryEvents.get(0).getMessageCount()); + assertEquals(false, this.expiryEvents.get(0).isDiscarded()); + assertEquals(0, this.store.getMessageGroup("ABC").size()); + Message message3 = createMessage(5, "ABC", 3, 3, replyChannel, null); + this.aggregator.handleMessage(message3); + assertEquals(0, this.store.getMessageGroup("ABC").size()); + Message discardedMessage = discardChannel.receive(1000); + assertNotNull("A message should have been discarded", discardedMessage); + assertSame(message3, discardedMessage); } @Test diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index 43aa1916e11..18582acbc77 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -19,6 +19,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; @@ -121,6 +122,7 @@ public void handleMessage(Message message) throws MessageRejectedException, M Object handler = context.getBean("aggregatorWithExpressions.handler"); assertSame(mbf, TestUtils.getPropertyValue(handler, "outputProcessor.messageBuilderFactory")); assertSame(mbf, TestUtils.getPropertyValue(handler, "outputProcessor.processor.messageBuilderFactory")); + assertTrue(TestUtils.getPropertyValue(handler, "expireGroupsUponTimeout", Boolean.class)); } @Test @@ -145,15 +147,16 @@ public void testPropertyAssignment() throws Exception { releaseStrategy, accessor.getPropertyValue("releaseStrategy")); assertEquals("The AggregatorEndpoint is not injected with the appropriate CorrelationStrategy instance", correlationStrategy, accessor.getPropertyValue("correlationStrategy")); - Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate output channel", + assertEquals("The AggregatorEndpoint is not injected with the appropriate output channel", outputChannel, accessor.getPropertyValue("outputChannel")); - Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate discard channel", + assertEquals("The AggregatorEndpoint is not injected with the appropriate discard channel", discardChannel, accessor.getPropertyValue("discardChannel")); - Assert.assertEquals("The AggregatorEndpoint is not set with the appropriate timeout value", 86420000l, + assertEquals("The AggregatorEndpoint is not set with the appropriate timeout value", 86420000l, TestUtils.getPropertyValue(consumer, "messagingTemplate.sendTimeout")); - Assert.assertEquals( + assertEquals( "The AggregatorEndpoint is not configured with the appropriate 'send partial results on timeout' flag", true, accessor.getPropertyValue("sendPartialResultOnExpiry")); + assertFalse(TestUtils.getPropertyValue(consumer, "expireGroupsUponTimeout", Boolean.class)); } @Test diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml b/spring-integration-core/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml index 4dc132d930a..3927a5f7ea4 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml @@ -27,7 +27,8 @@ release-strategy="releaseStrategy" correlation-strategy="correlationStrategy" send-timeout="86420000" - send-partial-result-on-expiry="true"/> + send-partial-result-on-expiry="true" + expire-groups-upon-timeout="false"/> diff --git a/src/reference/docbook/aggregator.xml b/src/reference/docbook/aggregator.xml index d3bd3a10220..8dbadaa1894 100644 --- a/src/reference/docbook/aggregator.xml +++ b/src/reference/docbook/aggregator.xml @@ -374,7 +374,9 @@ then you should simply provide an implementation of the ReleaseStrate group-timeout="60000" ]]> ]]> ]]> @@ -575,6 +577,16 @@ then you should simply provide an implementation of the ReleaseStrate + + When a group is completed due to a timeout (or by a MessageGroupStoreReaper), the group + is expired (completely removed) by default. Late arriving messages will start a new group. Set this + to false to complete the group but have its metadata remain so that late + arriving messages will be discarded. Empty groups can be expired later using a + MessageGroupStoreReaper together with the empty-group-min-timeout + attribute. Default: 'false'. + + + A TaskScheduler bean reference to schedule the MessageGroup to be forced complete diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml index 98a93597768..a5618c45c30 100644 --- a/src/reference/docbook/whats-new.xml +++ b/src/reference/docbook/whats-new.xml @@ -82,5 +82,12 @@ See for more information. +
+ Aggregator + + Aggregators now support a new attribute expire-groups-on-timeout. + See for more information. + +
From e7117c25688b9fede6c3ed4e470ab2f5c888b7db Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 21 Jul 2014 14:10:00 -0400 Subject: [PATCH 2/3] INT-3420 Add Callout Hyperlinks The aggregator configuration documentation had hyperlinks from the attribute descriptions to the attribute in the XML, but not vice-versa. For a large number of attributes such as this, bi-directional hyperlinks are useful. --- src/reference/docbook/aggregator.xml | 96 ++++++++++++++-------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/src/reference/docbook/aggregator.xml b/src/reference/docbook/aggregator.xml index 8dbadaa1894..c409d219a8c 100644 --- a/src/reference/docbook/aggregator.xml +++ b/src/reference/docbook/aggregator.xml @@ -346,37 +346,37 @@ then you should simply provide an implementation of the ReleaseStrate - ]]> ]]> @@ -393,47 +393,47 @@ then you should simply provide an implementation of the ReleaseStrate ]]> - + The id of the aggregator is 0ptional. - + Lifecycle attribute signaling if aggregator should be started during Application Context startup. Optional (default is 'true'). - + The channel from which where aggregator will receive messages. Required. - + The channel to which the aggregator will send the aggregation results. Optional (because incoming messages can specify a reply channel themselves via 'replyChannel' Message Header). - + The channel to which the aggregator will send the messages that timed out (if send-partial-result-on-expiry is false). Optional. - + A reference to a MessageGroupStore used to store groups of messages under their correlation key until they are complete. Optional, by default a volatile in-memory store. - + Order of this aggregator when more than one handle is subscribed to the same DirectChannel (use for load balancing purposes). Optional. - + Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing MessageGroup is expired (see MessageGroupStore.expireMessageGroups(long)). @@ -446,12 +446,12 @@ then you should simply provide an implementation of the ReleaseStrate Optional. Default - 'false'. - + The timeout interval for sending the aggregated messages to the output or reply channel. Optional. - + A reference to a bean that implements the message correlation (grouping) algorithm. The bean can be an implementation of the CorrelationStrategy interface or a POJO. In the latter case the correlation-strategy-method attribute must be defined @@ -459,7 +459,7 @@ then you should simply provide an implementation of the ReleaseStrate the IntegrationMessageHeaderAccessor.CORRELATION_ID header) . - + A method defined on the bean referenced by correlation-strategy, that implements the correlation decision algorithm. Optional, with @@ -467,25 +467,25 @@ then you should simply provide an implementation of the ReleaseStrate present). - + A SpEL expression representing the correlation strategy. Example: "headers['foo']". Only one of correlation-strategy or correlation-strategy-expression is allowed. - + A reference to a bean defined in the application context. The bean must implement the aggregation logic as described above. Optional (by default the list of aggregated Messages will become a payload of the output message). - + A method defined on the bean referenced by ref, that implements the message aggregation algorithm. Optional, depends on ref attribute being defined. - + A reference to a bean that implements the release strategy. The bean can be an implementation of the ReleaseStrategy interface or a POJO. In the latter case the release-strategy-method @@ -493,7 +493,7 @@ then you should simply provide an implementation of the ReleaseStrate aggregator will use the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header attribute). - + A method defined on the bean referenced by release-strategy, that implements the completion decision algorithm. Optional, with @@ -501,7 +501,7 @@ then you should simply provide an implementation of the ReleaseStrate present). - + A SpEL expression representing the release strategy; the root object for the expression is a Collection of Messages. Example: "size() == 5". Only one of @@ -509,7 +509,7 @@ then you should simply provide an implementation of the ReleaseStrate or release-strategy-expression is allowed. - + When set to true (default false), completed groups are removed from the message store, allowing subsequent messages with the same correlation to form a new group. The default behavior @@ -517,7 +517,7 @@ then you should simply provide an implementation of the ReleaseStrate group to the discard-channel. - + Only applies if a MessageGroupStoreReaper is configured for the <aggregator>'s MessageStore. By default, when a MessageGroupStoreReaper is configured to expire partial @@ -531,7 +531,7 @@ then you should simply provide an implementation of the ReleaseStrate property and it could be as much as this value plus the timeout. - + A reference to a org.springframework.integration.util.LockRegistry bean; used to obtain a Lock based on the groupId for @@ -543,7 +543,7 @@ then you should simply provide an implementation of the ReleaseStrate - + A timeout in milliseconds to force the MessageGroup complete, when the ReleaseStrategy doesn't release @@ -565,7 +565,7 @@ then you should simply provide an implementation of the ReleaseStrate Mutually exclusive with 'group-timeout-expression' attribute. - + The SpEL expression that evaluates to a groupTimeout with the MessageGroup as the #root evaluation context object. Used for scheduling the MessageGroup to @@ -576,7 +576,7 @@ then you should simply provide an implementation of the ReleaseStrate Mutually exclusive with 'group-timeout' attribute. - + When a group is completed due to a timeout (or by a MessageGroupStoreReaper), the group is expired (completely removed) by default. Late arriving messages will start a new group. Set this @@ -586,7 +586,7 @@ then you should simply provide an implementation of the ReleaseStrate attribute. Default: 'false'. - + A TaskScheduler bean reference to schedule the MessageGroup to be forced complete From 4471c600319dd62e6d74202968b5c62b5783b2c3 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 22 Jul 2014 17:26:48 -0400 Subject: [PATCH 3/3] INT-3420 Doc Polishing --- src/reference/docbook/aggregator.xml | 40 ++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/src/reference/docbook/aggregator.xml b/src/reference/docbook/aggregator.xml index c409d219a8c..aa41d5e8beb 100644 --- a/src/reference/docbook/aggregator.xml +++ b/src/reference/docbook/aggregator.xml @@ -443,7 +443,12 @@ then you should simply provide an implementation of the ReleaseStrate or by simply invoking that method if you have a reference to the MessageGroupStore instance. Otherwise by itself this attribute has no behavior. It only serves as an indicator of what to do (discard or send to the output/reply channel) with Messages that are still in the MessageGroup that is about to be expired. - Optional. Default - 'false'. + Optional. Default - 'false'. + + This attribute is more properly 'send-partial-result-on-timeout' because the group may not actually expire + if expire-groups-on-timeout is set to false. + + @@ -583,7 +588,7 @@ then you should simply provide an implementation of the ReleaseStrate to false to complete the group but have its metadata remain so that late arriving messages will be discarded. Empty groups can be expired later using a MessageGroupStoreReaper together with the empty-group-min-timeout - attribute. Default: 'false'. + attribute. Default: 'true'. @@ -601,6 +606,37 @@ then you should simply provide an implementation of the ReleaseStrate + + Expiring Groups + + There are two attributes related to expiring (completely removing) groups. When a group + is expired, there is no record of it and if a new message arrives with the same correlation, + a new group is started. When a group is completed (without expiry), the empty group remains + and late arriving messages are discarded. Empty groups can be removed later using a + MessageGroupStoreReaper in combination with the + empty-group-min-timeout attribute. + + + expire-groups-upon-completion relates to "normal" completion - when the + ReleaseStrategy releases the group. This defaults to + false. + + + If a group is not completed normally, but is released or discarded because of a timeout, + the group is normally expired. Since version 4.1, you can now control + this behavior using expire-groups-upon-timeout; this defaults to + true for backwards compatibility. + + + When a group is timed out, the ReleaseStrategy is given + one more opportunity to release the group; if it does so, then expiration is controlled + by expire-groups-upon-completion. If the group is not released by the release + strategy during timeout, then the expiration is controlled by the + expire-groups-upon-timeout. Timed-out groups are either discarded, or a + partial release occurs (based on send-partial-result-on-expiry). + + + Using a ref attribute is generally recommended if a custom aggregator handler implementation may be referenced in other <aggregator> definitions. However if a custom