Skip to content

INT-3529 Add 'expire-groups-upon-timeout' to Reseq #1296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,11 @@ public void setReleasePartialSequences(boolean releasePartialSequences) {

/**
* Expire (completely remove) a group if it is completed due to timeout.
* Subclasses setting this to false MUST handle null in the messages
* argument to {@link #afterRelease(MessageGroup, Collection)}.
* Default true.
* @param expireGroupsUponTimeout the expireGroupsOnTimeout to set
* Default true
* @param expireGroupsUponTimeout the expireGroupsUponTimeout to set
* @since 4.1
*/
protected void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
this.expireGroupsUponTimeout = expireGroupsUponTimeout;
}

Expand Down Expand Up @@ -485,6 +483,17 @@ private void discardMessage(Message<?> message) {
*/
protected abstract void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages);

/**
* Subclasses may override if special action is needed because the group was released or discarded
* due to a timeout. By default, {@link #afterRelease(MessageGroup, Collection)} is invoked.
* @param group The group.
* @param completedMessages The completed messages.
* @param timeout True if the release/discard was due to a timeout.
*/
protected void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages, boolean timeout) {
afterRelease(group, completedMessages);
}

protected void forceComplete(MessageGroup group) {

Object correlationKey = group.getGroupId();
Expand Down Expand Up @@ -535,7 +544,7 @@ protected void forceComplete(MessageGroup group) {
expireGroup(correlationKey, groupNow);
}
if (!this.expireGroupsUponTimeout) {
afterRelease(groupNow, null);
afterRelease(groupNow, groupNow.getMessages(), true);
removeGroup = false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ public void setExpireGroupsUponCompletion(boolean expireGroupsUponCompletion) {
this.expireGroupsUponCompletion = expireGroupsUponCompletion;
}

@Override
public void setExpireGroupsUponTimeout(boolean expireGroupsOnTimeout) {
super.setExpireGroupsUponTimeout(expireGroupsOnTimeout);
}

@Override
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
this.messageStore.completeGroup(messageGroup.getGroupId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

import java.util.Collection;

import org.springframework.messaging.Message;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.messaging.Message;

/**
* Resequencer specific implementation of {@link AbstractCorrelatingMessageHandler}.
Expand All @@ -33,21 +33,41 @@ public ResequencingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store, CorrelationStrategy correlationStrategy,
ReleaseStrategy releaseStrategy) {
super(processor, store, correlationStrategy, releaseStrategy);
this.setExpireGroupsUponTimeout(false);
}


public ResequencingMessageHandler(MessageGroupProcessor processor,
MessageGroupStore store) {
super(processor, store);
this.setExpireGroupsUponTimeout(false);
}


public ResequencingMessageHandler(MessageGroupProcessor processor) {
super(processor);
this.setExpireGroupsUponTimeout(false);
}

/**
* {@inheritDoc}
*
* (overridden to false for a resequencer so late messages are immediately discarded rather
* than waiting for the next timeout)
*/
@Override
public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
super.setExpireGroupsUponTimeout(expireGroupsUponTimeout);
}


@Override
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
afterRelease(messageGroup, completedMessages, false);
}

@Override
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages, boolean timeout) {

int size = messageGroup.getMessages().size();
int sequenceSize = 0;
Expand All @@ -67,6 +87,9 @@ protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> co
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), msg);
}
}
if (timeout) {
this.messageStore.completeGroup(messageGroup.getGroupId());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public abstract class AbstractCorrelatingMessageHandlerParser extends AbstractCo

private static final String SEND_PARTIAL_RESULT_ON_EXPIRY_ATTRIBUTE = "send-partial-result-on-expiry";

private static final String EXPIRE_GROUPS_UPON_TIMEOUT = "expire-groups-upon-timeout";

protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetadataElement processor,
ParserContext parserContext) {
this.injectPropertyWithAdapter(CORRELATION_STRATEGY_REF_ATTRIBUTE, CORRELATION_STRATEGY_METHOD_ATTRIBUTE,
Expand Down Expand Up @@ -87,6 +89,8 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad

IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(adviceChainElement, txElement,
builder.getRawBeanDefinition(), parserContext, "forceReleaseAdviceChain");

IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT);
}

protected void injectPropertyWithAdapter(String beanRefAttribute, String methodRefAttribute,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public class AggregatorParser extends AbstractCorrelatingMessageHandlerParser {

private static final String EXPIRE_GROUPS_UPON_COMPLETION = "expire-groups-upon-completion";

private static final String EXPIRE_GROUPS_UPON_TIMEOUT = "expire-groups-upon-timeout";

@Override
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
BeanComponentDefinition innerHandlerDefinition = IntegrationNamespaceUtils.parseInnerHandlerDefinition(element,
Expand Down Expand Up @@ -90,7 +88,6 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
this.doParse(builder, element, processor, parserContext);

IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_COMPLETION);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT);

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3418,16 +3418,6 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="expire-groups-upon-timeout" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Boolean flag specifying, if a group is completed due to timeout (reaper or
'group-timeout(-expression)'), whether the group should be removed.
When true, late arriving messages will form a new group. When false, they
will be discarded. Default is 'true'
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down Expand Up @@ -3556,6 +3546,17 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="expire-groups-upon-timeout" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Boolean flag specifying, if a group is completed due to timeout (reaper or
'group-timeout(-expression)'), whether the group should be removed.
When true, late arriving messages will form a new group. When false, they
will be discarded. Default is 'true' for an aggregator and 'false' for a
resequencer.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="lock-registry" type="xsd:string">
<xsd:annotation>
<xsd:appinfo>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
import org.junit.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
* @author Marius Bogoevici
Expand Down Expand Up @@ -243,10 +246,10 @@ public void testResequencingWithDiscard() throws InterruptedException {
new IntegrationMessageHeaderAccessor(reply2).getSequenceNumber()));
Collections.sort(sequence);
assertEquals("[1, 2]", sequence.toString());
// when sending the last message, the whole sequence must have been sent
// Once a group is expired, late messages are discarded immediately by default
this.resequencer.handleMessage(message3);
reply3 = discardChannel.receive(0);
assertNull(reply3);
assertNotNull(reply3);
}

@Test
Expand Down Expand Up @@ -322,6 +325,63 @@ public void testRemovalOfBarrierWhenLastMessageOfSequenceArrives() {
assertEquals(0, store.getMessageGroup(correlationId).size());
}

@Test
public void testTimeoutDefaultExpiry() throws InterruptedException {
this.resequencer.setGroupTimeoutExpression(new SpelExpressionParser().parseExpression("100"));
this.resequencer.setIntegrationEvaluationContext(new StandardEvaluationContext());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
this.resequencer.setTaskScheduler(taskScheduler);
QueueChannel discardChannel = new QueueChannel();
this.resequencer.setDiscardChannel(discardChannel);
QueueChannel replyChannel = new QueueChannel();
this.resequencer.setOutputChannel(replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, null);
Message<?> message2 = createMessage("456", "ABC", 3, 2, null);
this.resequencer.handleMessage(message3);
this.resequencer.handleMessage(message2);
Message<?> out1 = replyChannel.receive(0);
assertNull(out1);
out1 = discardChannel.receive(1000);
assertNotNull(out1);
Message<?> out2 = discardChannel.receive(0);
assertNotNull(out2);
Message<?> message1 = createMessage("123", "ABC", 3, 1, null);
this.resequencer.handleMessage(message1);
Message<?> out3 = discardChannel.receive(0);
assertNotNull(out3);
}

@Test
public void testTimeoutDontExpire() throws InterruptedException {
this.resequencer.setGroupTimeoutExpression(new SpelExpressionParser().parseExpression("100"));
this.resequencer.setIntegrationEvaluationContext(new StandardEvaluationContext());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
this.resequencer.setTaskScheduler(taskScheduler);
QueueChannel discardChannel = new QueueChannel();
this.resequencer.setDiscardChannel(discardChannel);
QueueChannel replyChannel = new QueueChannel();
this.resequencer.setOutputChannel(replyChannel);
this.resequencer.setExpireGroupsUponTimeout(true);
Message<?> message3 = createMessage("789", "ABC", 3, 3, null);
Message<?> message2 = createMessage("456", "ABC", 3, 2, null);
this.resequencer.handleMessage(message3);
this.resequencer.handleMessage(message2);
Message<?> out1 = replyChannel.receive(0);
assertNull(out1);
out1 = discardChannel.receive(1000);
assertNotNull(out1);
Message<?> out2 = discardChannel.receive(0);
assertNotNull(out2);
Message<?> message1 = createMessage("123", "ABC", 3, 1, null);
this.resequencer.handleMessage(message1);
Message<?> out3 = discardChannel.receive(0);
assertNull(out3);
out3 = discardChannel.receive(1000);
assertNotNull(out3);
}

private static Message<?> createMessage(String payload, Object correlationId, int sequenceSize, int sequenceNumber,
MessageChannel replyChannel) {
return MessageBuilder.withPayload(payload).setCorrelationId(correlationId).setSequenceSize(sequenceSize)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -14,6 +14,7 @@
package org.springframework.integration.config;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.springframework.integration.test.util.TestUtils.getPropertyValue;
Expand All @@ -22,10 +23,9 @@

import org.junit.Before;
import org.junit.Test;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.aggregator.MethodInvokingCorrelationStrategy;
import org.springframework.integration.aggregator.MethodInvokingReleaseStrategy;
Expand All @@ -37,6 +37,8 @@
import org.springframework.integration.store.SimpleMessageGroup;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

/**
* @author Marius Bogoevici
Expand All @@ -45,6 +47,7 @@
* @author Oleg Zhurakousky
* @author Stefan Ferstl
* @author Artem Bilan
* @author Gary Russell
*/
public class ResequencerParserTests {

Expand Down Expand Up @@ -108,6 +111,7 @@ public void testReleaseStrategyRefOnly() throws Exception {
ResequencingMessageHandler resequencer = getPropertyValue(endpoint, "handler", ResequencingMessageHandler.class);
assertEquals("The ResequencerEndpoint is not configured with the appropriate ReleaseStrategy",
context.getBean("testReleaseStrategy"), getPropertyValue(resequencer, "releaseStrategy"));
assertFalse(TestUtils.getPropertyValue(resequencer, "expireGroupsUponTimeout", Boolean.class));
}

@Test
Expand All @@ -128,6 +132,7 @@ public void testReleaseStrategyRefAndMethod() throws Exception {
effectiveReleaseStrategy.canRelease(new SimpleMessageGroup("test"));
assertEquals("The ResequencerEndpoint was not invoked the expected number of times;",
currentInvocationCount + 1, expectedReleaseStrategy.invocationCount);
assertTrue(TestUtils.getPropertyValue(resequencer, "expireGroupsUponTimeout", Boolean.class));
}

@Test
Expand Down Expand Up @@ -161,6 +166,7 @@ private static <T> Message<T> createMessage(T payload, Object correlationId, int

static class TestCorrelationStrategy implements CorrelationStrategy {

@Override
public Object getCorrelationKey(Message<?> message) {
return "test";
}
Expand All @@ -174,6 +180,7 @@ public Object foo(Object o) {
}

static class TestReleaseStrategy implements ReleaseStrategy {
@Override
public boolean canRelease(MessageGroup group) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

<resequencer id="resequencerWithReleaseStrategyRefAndMethod"
input-channel="inputChannel6"
expire-groups-upon-timeout="true"
release-strategy="testReleaseStrategyPojo"
release-strategy-method="bar"/>

Expand Down
16 changes: 15 additions & 1 deletion src/reference/docbook/resequencer.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@

group-timeout="60000" ]]><co id="resxml19" /><![CDATA[
group-timeout-expression="size() ge 2 ? 100 : -1" ]]><co id="resxml20" /><![CDATA[
scheduler="taskScheduler" /> ]]><co id="resxml21" /></programlisting>
scheduler="taskScheduler" /> ]]><co id="resxml21" /><![CDATA[
expire-group-upon-timeout="false" /> ]]><co id="resxml22" /></programlisting>
<para><calloutlist>
<callout arearefs="resxml1-co" id="resxml1">
<para>The id of the resequencer is
Expand Down Expand Up @@ -193,6 +194,19 @@
See <xref linkend="aggregator-xml"/>.
</para>
</callout>
<callout arearefs="aggxml22" id="aggxml22-txt">
<para>
When a group is completed due to a timeout (or by a <classname>MessageGroupStoreReaper</classname>), the
empty group's metadata
is retained by default. Late arriving messages will be immediately discarded. Set this
to <code>true</code> to remove the group completely; then, late arriving messages will start a new group
and won't be discarded until the group again times out. The new group will never be released normally
because of the "hole" in the sequence range that caused the timeout.
Empty groups can be expired (completely removed) later using a
<classname>MessageGroupStoreReaper</classname> together with the <code>empty-group-min-timeout</code>
attribute. Default: 'false'.
</para>
</callout>
</calloutlist></para>

<note>
Expand Down
Loading