Skip to content

INT-3420 Aggregator expire-groups-upon-timeout #1216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageH

private volatile ApplicationEventPublisher applicationEventPublisher;

private volatile boolean expireGroupsUponTimeout = true;

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
Assert.notNull(processor);
Expand Down Expand Up @@ -300,6 +302,18 @@ public void setReleasePartialSequences(boolean releasePartialSequences) {
this.releasePartialSequences = releasePartialSequences;
}

/**
* Expire (completely remove) a group if it is completed due to timeout.
* Subclasses setting this to false MUST handle null in the messages
* argument to {@link #afterRelease(MessageGroup, Collection)}.
* Default true.
* @param expireGroupsUponTimeout the expireGroupsOnTimeout to set
* @since 4.1
*/
protected void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't it be public ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it doesn't make sense for all ACMH; the AMH makes it public.

this.expireGroupsUponTimeout = expireGroupsUponTimeout;
}

@Override
public String getComponentType() {
return "aggregator";
Expand Down Expand Up @@ -462,7 +476,7 @@ public void run() {
*/
protected abstract void afterRelease(MessageGroup group, Collection<Message<?>> completedMessages);

private void forceComplete(MessageGroup group) {
protected void forceComplete(MessageGroup group) {

Object correlationKey = group.getGroupId();
// UUIDConverter is no-op if already converted
Expand Down Expand Up @@ -505,10 +519,14 @@ private void forceComplete(MessageGroup group) {
&& group.getTimestamp() == groupNow.getTimestamp()) {
if (groupSize > 0) {
if (releaseStrategy.canRelease(groupNow)) {
this.completeGroup(correlationKey, groupNow);
completeGroup(correlationKey, groupNow);
}
else {
this.expireGroup(correlationKey, groupNow);
expireGroup(correlationKey, groupNow);
}
if (!this.expireGroupsUponTimeout) {
afterRelease(groupNow, null);
removeGroup = false;
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ public void setExpireGroupsUponCompletion(boolean expireGroupsUponCompletion) {
this.expireGroupsUponCompletion = expireGroupsUponCompletion;
}

@Override
public void setExpireGroupsUponTimeout(boolean expireGroupsOnTimeout) {
super.setExpireGroupsUponTimeout(expireGroupsOnTimeout);
}

@Override
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {
this.messageStore.completeGroup(messageGroup.getGroupId());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.integration.config.xml;

import org.w3c.dom.Element;

import org.springframework.beans.BeanMetadataElement;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
Expand All @@ -26,7 +28,6 @@
import org.springframework.integration.aggregator.ExpressionEvaluatingMessageGroupProcessor;
import org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor;
import org.springframework.util.StringUtils;
import org.w3c.dom.Element;

/**
* Parser for the <em>aggregator</em> element of the integration namespace. Registers the annotation-driven
Expand All @@ -37,11 +38,14 @@
* @author Oleg Zhurakousky
* @author Dave Syer
* @author Stefan Ferstl
* @author Gary Russell
*/
public class AggregatorParser extends AbstractCorrelatingMessageHandlerParser {

private static final String EXPIRE_GROUPS_UPON_COMPLETION = "expire-groups-upon-completion";

private static final String EXPIRE_GROUPS_UPON_TIMEOUT = "expire-groups-upon-timeout";

@Override
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
BeanComponentDefinition innerHandlerDefinition = IntegrationNamespaceUtils.parseInnerHandlerDefinition(element,
Expand Down Expand Up @@ -86,6 +90,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
this.doParse(builder, element, processor, parserContext);

IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_COMPLETION);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, EXPIRE_GROUPS_UPON_TIMEOUT);

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3376,6 +3376,16 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="expire-groups-upon-timeout" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Boolean flag specifying, if a group is completed due to timeout (reaper or
'group-timeout(-expression)'), whether the group should be removed.
When true, late arriving messages will form a new group. When false, they
will be discarded. Default is 'true'
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@

package org.springframework.integration.aggregator;

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will you configure your STS for asterisk?..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced it's the right thing to do. We can talk.


import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -237,6 +241,38 @@ public void testShouldSendPartialResultOnTimeoutTrue() throws InterruptedExcepti
assertEquals("ABC", this.expiryEvents.get(0).getGroupId());
assertEquals(2, this.expiryEvents.get(0).getMessageCount());
assertEquals(false, this.expiryEvents.get(0).isDiscarded());
Message<?> message3 = createMessage(5, "ABC", 3, 3, replyChannel, null);
this.aggregator.handleMessage(message3);
assertEquals(1, this.store.getMessageGroup("ABC").size());
}

@Test
public void testGroupRemainsAfterTimeout() throws InterruptedException {
this.aggregator.setSendPartialResultOnExpiry(true);
this.aggregator.setExpireGroupsUponTimeout(false);
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
this.aggregator.setDiscardChannel(discardChannel);
Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
this.aggregator.handleMessage(message1);
this.aggregator.handleMessage(message2);
this.store.expireMessageGroups(-10000);
Message<?> reply = replyChannel.receive(1000);
assertNotNull("A reply message should have been received", reply);
assertEquals(15, reply.getPayload());
assertEquals(1, expiryEvents.size());
assertSame(this.aggregator, expiryEvents.get(0).getSource());
assertEquals("ABC", this.expiryEvents.get(0).getGroupId());
assertEquals(2, this.expiryEvents.get(0).getMessageCount());
assertEquals(false, this.expiryEvents.get(0).isDiscarded());
assertEquals(0, this.store.getMessageGroup("ABC").size());
Message<?> message3 = createMessage(5, "ABC", 3, 3, replyChannel, null);
this.aggregator.handleMessage(message3);
assertEquals(0, this.store.getMessageGroup("ABC").size());
Message<?> discardedMessage = discardChannel.receive(1000);
assertNotNull("A message should have been discarded", discardedMessage);
assertSame(message3, discardedMessage);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -121,6 +122,7 @@ public void handleMessage(Message<?> message) throws MessageRejectedException, M
Object handler = context.getBean("aggregatorWithExpressions.handler");
assertSame(mbf, TestUtils.getPropertyValue(handler, "outputProcessor.messageBuilderFactory"));
assertSame(mbf, TestUtils.getPropertyValue(handler, "outputProcessor.processor.messageBuilderFactory"));
assertTrue(TestUtils.getPropertyValue(handler, "expireGroupsUponTimeout", Boolean.class));
}

@Test
Expand All @@ -145,15 +147,16 @@ public void testPropertyAssignment() throws Exception {
releaseStrategy, accessor.getPropertyValue("releaseStrategy"));
assertEquals("The AggregatorEndpoint is not injected with the appropriate CorrelationStrategy instance",
correlationStrategy, accessor.getPropertyValue("correlationStrategy"));
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate output channel",
assertEquals("The AggregatorEndpoint is not injected with the appropriate output channel",
outputChannel, accessor.getPropertyValue("outputChannel"));
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate discard channel",
assertEquals("The AggregatorEndpoint is not injected with the appropriate discard channel",
discardChannel, accessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The AggregatorEndpoint is not set with the appropriate timeout value", 86420000l,
assertEquals("The AggregatorEndpoint is not set with the appropriate timeout value", 86420000l,
TestUtils.getPropertyValue(consumer, "messagingTemplate.sendTimeout"));
Assert.assertEquals(
assertEquals(
"The AggregatorEndpoint is not configured with the appropriate 'send partial results on timeout' flag",
true, accessor.getPropertyValue("sendPartialResultOnExpiry"));
assertFalse(TestUtils.getPropertyValue(consumer, "expireGroupsUponTimeout", Boolean.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
release-strategy="releaseStrategy"
correlation-strategy="correlationStrategy"
send-timeout="86420000"
send-partial-result-on-expiry="true"/>
send-partial-result-on-expiry="true"
expire-groups-upon-timeout="false"/>

<channel id="aggregatorWithExpressionsInput"/>
<channel id="aggregatorWithExpressionsOutput"/>
Expand Down
Loading