From d0d215a45a7712d41025a5aa4e78f5e4cfec6a07 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 14 Oct 2014 10:43:33 -0400 Subject: [PATCH 1/2] INT-3529 Add 'expire-groups-upon-timeout' to Reseq JIRA: https://jira.spring.io/browse/INT-3529 Late arriving messages will now be immediately discarded by default. --- .../AbstractCorrelatingMessageHandler.java | 11 ++-- .../aggregator/AggregatingMessageHandler.java | 5 -- .../ResequencingMessageHandler.java | 17 ++++- ...stractCorrelatingMessageHandlerParser.java | 4 ++ .../config/xml/AggregatorParser.java | 3 - .../config/xml/spring-integration-4.1.xsd | 21 +++--- .../aggregator/ResequencerTests.java | 64 ++++++++++++++++++- .../config/ResequencerParserTests.java | 13 +++- .../config/resequencerParserTests.xml | 1 + src/reference/docbook/resequencer.xml | 16 ++++- src/reference/docbook/whats-new.xml | 9 +++ 11 files changed, 133 insertions(+), 31 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java index e7ede054354..773e6061316 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java @@ -291,13 +291,11 @@ public void setReleasePartialSequences(boolean releasePartialSequences) { /** * Expire (completely remove) a group if it is completed due to timeout. - * Subclasses setting this to false MUST handle null in the messages - * argument to {@link #afterRelease(MessageGroup, Collection)}. - * Default true. - * @param expireGroupsUponTimeout the expireGroupsOnTimeout to set + * Default true + * @param expireGroupsUponTimeout the expireGroupsUponTimeout to set * @since 4.1 */ - protected void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) { + public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) { this.expireGroupsUponTimeout = expireGroupsUponTimeout; } @@ -535,8 +533,9 @@ protected void forceComplete(MessageGroup group) { expireGroup(correlationKey, groupNow); } if (!this.expireGroupsUponTimeout) { - afterRelease(groupNow, null); + afterRelease(groupNow, groupNow.getMessages()); removeGroup = false; + this.messageStore.completeGroup(correlationKey); // late messages immediately discarded } } else { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java index cd90ea0f585..130320ec542 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java @@ -56,11 +56,6 @@ public void setExpireGroupsUponCompletion(boolean expireGroupsUponCompletion) { this.expireGroupsUponCompletion = expireGroupsUponCompletion; } - @Override - public void setExpireGroupsUponTimeout(boolean expireGroupsOnTimeout) { - super.setExpireGroupsUponTimeout(expireGroupsOnTimeout); - } - @Override protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages) { this.messageStore.completeGroup(messageGroup.getGroupId()); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java index 5881e0b1614..358d2ef7f18 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java @@ -15,10 +15,10 @@ import java.util.Collection; -import org.springframework.messaging.Message; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; +import org.springframework.messaging.Message; /** * Resequencer specific implementation of {@link AbstractCorrelatingMessageHandler}. @@ -33,19 +33,34 @@ public ResequencingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) { super(processor, store, correlationStrategy, releaseStrategy); + this.setExpireGroupsUponTimeout(false); } public ResequencingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store) { super(processor, store); + this.setExpireGroupsUponTimeout(false); } public ResequencingMessageHandler(MessageGroupProcessor processor) { super(processor); + this.setExpireGroupsUponTimeout(false); + } + + /** + * {@inheritDoc} + * + * (overridden to false for a resequencer so late messages are immediately discarded rather + * than waiting for the next timeout) + */ + @Override + public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) { + super.setExpireGroupsUponTimeout(expireGroupsUponTimeout); } + @Override protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java index 7b3078c6200..171dbad0379 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java @@ -59,6 +59,8 @@ public abstract class AbstractCorrelatingMessageHandlerParser extends AbstractCo private static final String SEND_PARTIAL_RESULT_ON_EXPIRY_ATTRIBUTE = "send-partial-result-on-expiry"; + private static final String EXPIRE_GROUPS_UPON_TIMEOUT = "expire-groups-upon-timeout"; + protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetadataElement processor, ParserContext parserContext) { this.injectPropertyWithAdapter(CORRELATION_STRATEGY_REF_ATTRIBUTE, CORRELATION_STRATEGY_METHOD_ATTRIBUTE, @@ -87,6 +89,8 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(adviceChainElement, txElement, builder.getRawBeanDefinition(), parserContext, "forceReleaseAdviceChain"); + + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT); } protected void injectPropertyWithAdapter(String beanRefAttribute, String methodRefAttribute, diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java index 418c1e2fab3..f6fc664e2ad 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java @@ -44,8 +44,6 @@ public class AggregatorParser extends AbstractCorrelatingMessageHandlerParser { private static final String EXPIRE_GROUPS_UPON_COMPLETION = "expire-groups-upon-completion"; - private static final String EXPIRE_GROUPS_UPON_TIMEOUT = "expire-groups-upon-timeout"; - @Override protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) { BeanComponentDefinition innerHandlerDefinition = IntegrationNamespaceUtils.parseInnerHandlerDefinition(element, @@ -90,7 +88,6 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars this.doParse(builder, element, processor, parserContext); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_COMPLETION); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT); return builder; } diff --git a/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd b/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd index e203d0e80df..82401f1247d 100644 --- a/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd +++ b/spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd @@ -3418,16 +3418,6 @@ - - - - Boolean flag specifying, if a group is completed due to timeout (reaper or - 'group-timeout(-expression)'), whether the group should be removed. - When true, late arriving messages will form a new group. When false, they - will be discarded. Default is 'true' - - - @@ -3556,6 +3546,17 @@ + + + + Boolean flag specifying, if a group is completed due to timeout (reaper or + 'group-timeout(-expression)'), whether the group should be removed. + When true, late arriving messages will form a new group. When false, they + will be discarded. Default is 'true' for an aggregator and 'false' for a + resequencer. + + + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencerTests.java index 58b50fffef6..93a790653b4 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencerTests.java @@ -31,6 +31,8 @@ import org.junit.Test; import org.springframework.beans.factory.BeanFactory; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.store.MessageGroupStore; @@ -38,6 +40,7 @@ import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * @author Marius Bogoevici @@ -243,10 +246,10 @@ public void testResequencingWithDiscard() throws InterruptedException { new IntegrationMessageHeaderAccessor(reply2).getSequenceNumber())); Collections.sort(sequence); assertEquals("[1, 2]", sequence.toString()); - // when sending the last message, the whole sequence must have been sent + // Once a group is expired, late messages are discarded immediately by default this.resequencer.handleMessage(message3); reply3 = discardChannel.receive(0); - assertNull(reply3); + assertNotNull(reply3); } @Test @@ -322,6 +325,63 @@ public void testRemovalOfBarrierWhenLastMessageOfSequenceArrives() { assertEquals(0, store.getMessageGroup(correlationId).size()); } + @Test + public void testTimeoutDefaultExpiry() throws InterruptedException { + this.resequencer.setGroupTimeoutExpression(new SpelExpressionParser().parseExpression("100")); + this.resequencer.setIntegrationEvaluationContext(new StandardEvaluationContext()); + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.afterPropertiesSet(); + this.resequencer.setTaskScheduler(taskScheduler); + QueueChannel discardChannel = new QueueChannel(); + this.resequencer.setDiscardChannel(discardChannel); + QueueChannel replyChannel = new QueueChannel(); + this.resequencer.setOutputChannel(replyChannel); + Message message3 = createMessage("789", "ABC", 3, 3, null); + Message message2 = createMessage("456", "ABC", 3, 2, null); + this.resequencer.handleMessage(message3); + this.resequencer.handleMessage(message2); + Message out1 = replyChannel.receive(0); + assertNull(out1); + out1 = discardChannel.receive(1000); + assertNotNull(out1); + Message out2 = discardChannel.receive(0); + assertNotNull(out2); + Message message1 = createMessage("123", "ABC", 3, 1, null); + this.resequencer.handleMessage(message1); + Message out3 = discardChannel.receive(0); + assertNotNull(out3); + } + + @Test + public void testTimeoutDontExpire() throws InterruptedException { + this.resequencer.setGroupTimeoutExpression(new SpelExpressionParser().parseExpression("100")); + this.resequencer.setIntegrationEvaluationContext(new StandardEvaluationContext()); + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.afterPropertiesSet(); + this.resequencer.setTaskScheduler(taskScheduler); + QueueChannel discardChannel = new QueueChannel(); + this.resequencer.setDiscardChannel(discardChannel); + QueueChannel replyChannel = new QueueChannel(); + this.resequencer.setOutputChannel(replyChannel); + this.resequencer.setExpireGroupsUponTimeout(true); + Message message3 = createMessage("789", "ABC", 3, 3, null); + Message message2 = createMessage("456", "ABC", 3, 2, null); + this.resequencer.handleMessage(message3); + this.resequencer.handleMessage(message2); + Message out1 = replyChannel.receive(0); + assertNull(out1); + out1 = discardChannel.receive(1000); + assertNotNull(out1); + Message out2 = discardChannel.receive(0); + assertNotNull(out2); + Message message1 = createMessage("123", "ABC", 3, 1, null); + this.resequencer.handleMessage(message1); + Message out3 = discardChannel.receive(0); + assertNull(out3); + out3 = discardChannel.receive(1000); + assertNotNull(out3); + } + private static Message createMessage(String payload, Object correlationId, int sequenceSize, int sequenceNumber, MessageChannel replyChannel) { return MessageBuilder.withPayload(payload).setCorrelationId(correlationId).setSequenceSize(sequenceSize) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index 67ff81af8f3..ad97168df40 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -14,6 +14,7 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.springframework.integration.test.util.TestUtils.getPropertyValue; @@ -22,10 +23,9 @@ import org.junit.Before; import org.junit.Test; + import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; import org.springframework.integration.aggregator.CorrelationStrategy; import org.springframework.integration.aggregator.MethodInvokingCorrelationStrategy; import org.springframework.integration.aggregator.MethodInvokingReleaseStrategy; @@ -37,6 +37,8 @@ import org.springframework.integration.store.SimpleMessageGroup; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; /** * @author Marius Bogoevici @@ -45,6 +47,7 @@ * @author Oleg Zhurakousky * @author Stefan Ferstl * @author Artem Bilan + * @author Gary Russell */ public class ResequencerParserTests { @@ -108,6 +111,7 @@ public void testReleaseStrategyRefOnly() throws Exception { ResequencingMessageHandler resequencer = getPropertyValue(endpoint, "handler", ResequencingMessageHandler.class); assertEquals("The ResequencerEndpoint is not configured with the appropriate ReleaseStrategy", context.getBean("testReleaseStrategy"), getPropertyValue(resequencer, "releaseStrategy")); + assertFalse(TestUtils.getPropertyValue(resequencer, "expireGroupsUponTimeout", Boolean.class)); } @Test @@ -128,6 +132,7 @@ public void testReleaseStrategyRefAndMethod() throws Exception { effectiveReleaseStrategy.canRelease(new SimpleMessageGroup("test")); assertEquals("The ResequencerEndpoint was not invoked the expected number of times;", currentInvocationCount + 1, expectedReleaseStrategy.invocationCount); + assertTrue(TestUtils.getPropertyValue(resequencer, "expireGroupsUponTimeout", Boolean.class)); } @Test @@ -161,6 +166,7 @@ private static Message createMessage(T payload, Object correlationId, int static class TestCorrelationStrategy implements CorrelationStrategy { + @Override public Object getCorrelationKey(Message message) { return "test"; } @@ -174,6 +180,7 @@ public Object foo(Object o) { } static class TestReleaseStrategy implements ReleaseStrategy { + @Override public boolean canRelease(MessageGroup group) { return true; } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/resequencerParserTests.xml b/spring-integration-core/src/test/java/org/springframework/integration/config/resequencerParserTests.xml index 834d31c541a..8358916eedb 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/resequencerParserTests.xml +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/resequencerParserTests.xml @@ -53,6 +53,7 @@ diff --git a/src/reference/docbook/resequencer.xml b/src/reference/docbook/resequencer.xml index 01e849459c6..1ad53fed954 100644 --- a/src/reference/docbook/resequencer.xml +++ b/src/reference/docbook/resequencer.xml @@ -58,7 +58,8 @@ group-timeout="60000" ]]> ]]> + scheduler="taskScheduler" /> ]]> ]]> The id of the resequencer is @@ -193,6 +194,19 @@ See . + + + When a group is completed due to a timeout (or by a MessageGroupStoreReaper), the + empty group's metadata + is retained by default. Late arriving messages will be immediately discarded. Set this + to true to remove the group completely; then, late arriving messages will start a new group + and won't be discarded until the group again times out. The new group will never be released normally + because of the "hole" in the sequence range that caused the timeout. + Empty groups can be expired (completely removed) later using a + MessageGroupStoreReaper together with the empty-group-min-timeout + attribute. Default: 'false'. + + diff --git a/src/reference/docbook/whats-new.xml b/src/reference/docbook/whats-new.xml index 70963a7c570..21fcb4f5253 100644 --- a/src/reference/docbook/whats-new.xml +++ b/src/reference/docbook/whats-new.xml @@ -221,5 +221,14 @@ See and . +
+ Resequencer Changes + + When a message group in a resequencer is timed out (using group-timeout or a + MessageGroupStoreReaper), late arriving messages will now be discarded + immediately by default. + See . + +
From a0e3434e51a139a01fa60e6e472ad728cdbe6101 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 15 Oct 2014 10:22:24 -0400 Subject: [PATCH 2/2] INT-3529 Polishing Move group completion to the Resequencer's `afterRelease()` when the group is timed out. --- .../AbstractCorrelatingMessageHandler.java | 14 ++++++++++++-- .../aggregator/ResequencingMessageHandler.java | 8 ++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java index 773e6061316..c9c8c4fa578 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java @@ -483,6 +483,17 @@ private void discardMessage(Message message) { */ protected abstract void afterRelease(MessageGroup group, Collection> completedMessages); + /** + * Subclasses may override if special action is needed because the group was released or discarded + * due to a timeout. By default, {@link #afterRelease(MessageGroup, Collection)} is invoked. + * @param group The group. + * @param completedMessages The completed messages. + * @param timeout True if the release/discard was due to a timeout. + */ + protected void afterRelease(MessageGroup group, Collection> completedMessages, boolean timeout) { + afterRelease(group, completedMessages); + } + protected void forceComplete(MessageGroup group) { Object correlationKey = group.getGroupId(); @@ -533,9 +544,8 @@ protected void forceComplete(MessageGroup group) { expireGroup(correlationKey, groupNow); } if (!this.expireGroupsUponTimeout) { - afterRelease(groupNow, groupNow.getMessages()); + afterRelease(groupNow, groupNow.getMessages(), true); removeGroup = false; - this.messageStore.completeGroup(correlationKey); // late messages immediately discarded } } else { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java index 358d2ef7f18..ddb6ceda8ac 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java @@ -63,6 +63,11 @@ public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) { @Override protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages) { + afterRelease(messageGroup, completedMessages, false); + } + + @Override + protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages, boolean timeout) { int size = messageGroup.getMessages().size(); int sequenceSize = 0; @@ -82,6 +87,9 @@ protected void afterRelease(MessageGroup messageGroup, Collection> co this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), msg); } } + if (timeout) { + this.messageStore.completeGroup(messageGroup.getGroupId()); + } } }