Skip to content

Commit d5f1bb6

Browse files
artembilangaryrussell
authored andcommitted
INT-3512: Add <advice-chain> to the Aggregator
JIRA: https://jira.spring.io/browse/INT-3512 INT-3512: Fix typos INT-3512: add `expire-` prefix to the advice sub-elements INT-3512: Mark `AggregatorWithCustomReleaseStrategyTests` as `LONG_RUNNING_TEST` Doc Polishing
1 parent 113716e commit d5f1bb6

File tree

10 files changed

+291
-17
lines changed

10 files changed

+291
-17
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@
2424
import java.util.concurrent.ScheduledFuture;
2525
import java.util.concurrent.locks.Lock;
2626

27+
import org.aopalliance.aop.Advice;
2728
import org.apache.commons.logging.Log;
2829
import org.apache.commons.logging.LogFactory;
2930

31+
import org.springframework.aop.framework.ProxyFactory;
3032
import org.springframework.beans.BeansException;
3133
import org.springframework.beans.factory.BeanFactory;
3234
import org.springframework.beans.factory.BeanFactoryAware;
@@ -116,6 +118,10 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageP
116118

117119
private volatile Expression groupTimeoutExpression;
118120

121+
private volatile List<Advice> forceReleaseAdviceChain;
122+
123+
private MessageGroupProcessor forceReleaseProcessor = new ForceReleaseMessageGroupProcessor();
124+
119125
private EvaluationContext evaluationContext;
120126

121127
private volatile ApplicationEventPublisher applicationEventPublisher;
@@ -157,7 +163,7 @@ public void setMessageStore(MessageGroupStore store) {
157163
store.registerMessageGroupExpiryCallback(new MessageGroupCallback() {
158164
@Override
159165
public void execute(MessageGroupStore messageGroupStore, MessageGroup group) {
160-
forceComplete(group);
166+
forceReleaseProcessor.processMessageGroup(group);
161167
}
162168
});
163169
}
@@ -177,6 +183,11 @@ public void setGroupTimeoutExpression(Expression groupTimeoutExpression) {
177183
this.groupTimeoutExpression = groupTimeoutExpression;
178184
}
179185

186+
public void setForceReleaseAdviceChain(List<Advice> forceReleaseAdviceChain) {
187+
Assert.notNull(forceReleaseAdviceChain, "forceReleaseAdviceChain must not be null");
188+
this.forceReleaseAdviceChain = forceReleaseAdviceChain;
189+
}
190+
180191
@Override
181192
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
182193
this.evaluationContext = evaluationContext;
@@ -226,6 +237,20 @@ protected void onInit() throws Exception {
226237
* (checked in the setter).
227238
*/
228239
this.lockRegistrySet = true;
240+
this.forceReleaseProcessor = createGroupTimeoutProcessor();
241+
}
242+
243+
private MessageGroupProcessor createGroupTimeoutProcessor() {
244+
MessageGroupProcessor processor = new ForceReleaseMessageGroupProcessor();
245+
246+
if (this.groupTimeoutExpression != null && !CollectionUtils.isEmpty(this.forceReleaseAdviceChain)) {
247+
ProxyFactory proxyFactory = new ProxyFactory(processor);
248+
for (Advice advice : this.forceReleaseAdviceChain) {
249+
proxyFactory.addAdvice(advice);
250+
}
251+
return (MessageGroupProcessor) proxyFactory.getProxy(getApplicationContext().getClassLoader());
252+
}
253+
return processor;
229254
}
230255

231256
public void setDiscardChannel(MessageChannel discardChannel) {
@@ -409,7 +434,7 @@ private void scheduleGroupToForceComplete(final MessageGroup messageGroup) {
409434
@Override
410435
public void run() {
411436
try {
412-
AbstractCorrelatingMessageHandler.this.forceComplete(messageGroup);
437+
forceReleaseProcessor.processMessageGroup(messageGroup);
413438
}
414439
catch (MessageDeliveryException e) {
415440
if (logger.isDebugEnabled()) {
@@ -427,7 +452,7 @@ public void run() {
427452
this.expireGroupScheduledFutures.put(UUIDConverter.getUUID(messageGroup.getGroupId()), scheduledFuture);
428453
}
429454
else {
430-
forceComplete(messageGroup);
455+
this.forceReleaseProcessor.processMessageGroup(messageGroup);
431456
}
432457
}
433458
}
@@ -735,4 +760,14 @@ private boolean containsSequenceNumber(Collection<Message<?>> messages, Integer
735760

736761
}
737762

763+
private class ForceReleaseMessageGroupProcessor implements MessageGroupProcessor {
764+
765+
@Override
766+
public Object processMessageGroup(MessageGroup group) {
767+
forceComplete(group);
768+
return null;
769+
}
770+
771+
}
772+
738773
}

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler;
2323
import org.springframework.integration.config.IntegrationConfigUtils;
2424
import org.springframework.util.StringUtils;
25+
import org.springframework.util.xml.DomUtils;
2526

2627
/**
2728
* Base class for parsers that create an instance of {@link AbstractCorrelatingMessageHandler}
@@ -58,7 +59,8 @@ public abstract class AbstractCorrelatingMessageHandlerParser extends AbstractCo
5859

5960
private static final String SEND_PARTIAL_RESULT_ON_EXPIRY_ATTRIBUTE = "send-partial-result-on-expiry";
6061

61-
protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetadataElement processor, ParserContext parserContext){
62+
protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetadataElement processor,
63+
ParserContext parserContext) {
6264
this.injectPropertyWithAdapter(CORRELATION_STRATEGY_REF_ATTRIBUTE, CORRELATION_STRATEGY_METHOD_ATTRIBUTE,
6365
CORRELATION_STRATEGY_EXPRESSION_ATTRIBUTE, CORRELATION_STRATEGY_PROPERTY, "CorrelationStrategy",
6466
element, builder, processor, parserContext);
@@ -72,12 +74,19 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad
7274
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "lock-registry");
7375
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE);
7476
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_EXPIRY_ATTRIBUTE);
75-
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "empty-group-min-timeout", "minimumTimeoutForEmptyGroups");
77+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "empty-group-min-timeout",
78+
"minimumTimeoutForEmptyGroups");
7679

7780
BeanDefinition expressionDef =
78-
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("group-timeout", "group-timeout-expression",
79-
parserContext, element, false);
81+
IntegrationNamespaceUtils.createExpressionDefinitionFromValueOrExpression("group-timeout",
82+
"group-timeout-expression", parserContext, element, false);
8083
builder.addPropertyValue("groupTimeoutExpression", expressionDef);
84+
85+
Element txElement = DomUtils.getChildElementByTagName(element, "expire-transactional");
86+
Element adviceChainElement = DomUtils.getChildElementByTagName(element, "expire-advice-chain");
87+
88+
IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(adviceChainElement, txElement,
89+
builder.getRawBeanDefinition(), parserContext, "forceReleaseAdviceChain");
8190
}
8291

8392
protected void injectPropertyWithAdapter(String beanRefAttribute, String methodRefAttribute,
@@ -92,8 +101,8 @@ protected void injectPropertyWithAdapter(String beanRefAttribute, String methodR
92101
final boolean hasExpression = StringUtils.hasText(expression);
93102

94103
if (hasBeanRef && hasExpression) {
95-
parserContext.getReaderContext().error("Exactly one of the '" + beanRefAttribute + "' or '" + expressionAttribute +
96-
"' attribute is allowed.", element);
104+
parserContext.getReaderContext().error("Exactly one of the '" + beanRefAttribute + "' or '"
105+
+ expressionAttribute + "' attribute is allowed.", element);
97106
}
98107

99108
BeanMetadataElement adapter = null;
@@ -126,4 +135,5 @@ private BeanMetadataElement createAdapter(BeanMetadataElement ref, String method
126135
}
127136
return builder.getBeanDefinition();
128137
}
138+
129139
}

spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.springframework.scheduling.TaskScheduler;
4747
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
4848
import org.springframework.util.Assert;
49-
import org.springframework.util.ClassUtils;
5049
import org.springframework.util.CollectionUtils;
5150

5251
/**
@@ -237,7 +236,7 @@ private MessageHandler createReleaseMessageTask() {
237236
for (Advice advice : delayedAdviceChain) {
238237
proxyFactory.addAdvice(advice);
239238
}
240-
return (MessageHandler) proxyFactory.getProxy(ClassUtils.getDefaultClassLoader());
239+
return (MessageHandler) proxyFactory.getProxy(getApplicationContext().getClassLoader());
241240
}
242241
return releaseHandler;
243242
}

spring-integration-core/src/main/resources/org/springframework/integration/config/xml/spring-integration-4.1.xsd

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3435,6 +3435,15 @@
34353435
<xsd:complexType name="correlating-message-handler-type">
34363436
<xsd:complexContent>
34373437
<xsd:extension base="innerEndpointDefinitionAware">
3438+
<xsd:choice>
3439+
<xsd:annotation>
3440+
<xsd:documentation>
3441+
'transactional' or 'advice-chain' are applied only to 'forceComplete' operation.
3442+
</xsd:documentation>
3443+
</xsd:annotation>
3444+
<xsd:element name="expire-transactional" type="transactionalType" minOccurs="0" maxOccurs="1" />
3445+
<xsd:element name="expire-advice-chain" type="adviceChainType" minOccurs="0" maxOccurs="1" />
3446+
</xsd:choice>
34383447
<xsd:attribute name="correlation-strategy" type="xsd:string">
34393448
<xsd:annotation>
34403449
<xsd:appinfo>

spring-integration-core/src/test/java/org/springframework/integration/aggregator/scenarios/AggregatorWithCustomReleaseStrategyTests.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
import java.util.concurrent.TimeUnit;
2222

2323
import org.junit.AfterClass;
24+
import org.junit.Assume;
25+
import org.junit.Rule;
2426
import org.junit.Test;
27+
import org.junit.rules.TestWatcher;
28+
import org.junit.runner.Description;
29+
import org.junit.runners.model.Statement;
2530

2631
import org.springframework.context.support.AbstractApplicationContext;
2732
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -37,6 +42,40 @@
3742
*/
3843
public class AggregatorWithCustomReleaseStrategyTests {
3944

45+
@Rule
46+
public TestWatcher longTests = new TestWatcher() {
47+
48+
private static final String RUN_LONG_PROP = "RUN_LONG_INTEGRATION_TESTS";
49+
50+
private boolean shouldRun;
51+
52+
{
53+
for(String value: new String[]{System.getenv(RUN_LONG_PROP), System.getProperty(RUN_LONG_PROP)}) {
54+
if ("true".equalsIgnoreCase(value)) {
55+
this.shouldRun = true;
56+
break;
57+
}
58+
}
59+
}
60+
61+
@Override
62+
public Statement apply(Statement base, Description description) {
63+
if (!this.shouldRun) {
64+
return new Statement() {
65+
66+
@Override
67+
public void evaluate() throws Throwable {
68+
Assume.assumeTrue(false);
69+
}
70+
};
71+
}
72+
else {
73+
return super.apply(base, description);
74+
}
75+
}
76+
77+
};
78+
4079
private static ExecutorService executor = Executors.newCachedThreadPool();
4180

4281
@AfterClass

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageStore.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ public Message<?> removeMessage(UUID id) {
308308
@Override
309309
@ManagedAttribute
310310
public long getMessageCount() {
311-
return jdbcTemplate.queryForObject(getQuery(Query.GET_MESSAGE_COUNT), Integer.class, region);
311+
return jdbcTemplate.queryForObject(getQuery(Query.GET_MESSAGE_COUNT), Long.class, region);
312312
}
313313

314314
@Override
@@ -334,7 +334,7 @@ public <T> Message<T> addMessage(final Message<T> message) {
334334

335335
final long createdDate = System.currentTimeMillis();
336336
Message<T> result = this.getMessageBuilderFactory().fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE)
337-
.setHeader(CREATED_DATE_KEY, new Long(createdDate)).build();
337+
.setHeader(CREATED_DATE_KEY, createdDate).build();
338338

339339
Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers");
340340
// using reflection to set ID since it is immutable through MessageHeaders
@@ -452,7 +452,7 @@ public void processRow(ResultSet rs) throws SQLException {
452452
}
453453

454454
long timestamp = createDate.get().getTime();
455-
boolean complete = completeFlag.get().booleanValue();
455+
boolean complete = completeFlag.get();
456456

457457
SimpleMessageGroup messageGroup = new SimpleMessageGroup(messages, groupId, timestamp, complete);
458458
messageGroup.setLastModified(updateDate.get().getTime());
@@ -710,8 +710,9 @@ private class MessageMapper implements RowMapper<Message<?>> {
710710

711711
@Override
712712
public Message<?> mapRow(ResultSet rs, int rowNum) throws SQLException {
713-
Message<?> message = (Message<?>) deserializer.convert(lobHandler.getBlobAsBytes(rs, "MESSAGE_BYTES"));
714-
return message;
713+
return (Message<?>) deserializer.convert(lobHandler.getBlobAsBytes(rs, "MESSAGE_BYTES"));
715714
}
715+
716716
}
717+
717718
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns="http://www.springframework.org/schema/integration"
5+
xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
6+
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
7+
xmlns:context="http://www.springframework.org/schema/context"
8+
xsi:schemaLocation="http://www.springframework.org/schema/beans
9+
http://www.springframework.org/schema/beans/spring-beans.xsd
10+
http://www.springframework.org/schema/integration
11+
http://www.springframework.org/schema/integration/spring-integration.xsd
12+
http://www.springframework.org/schema/jdbc
13+
http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
14+
http://www.springframework.org/schema/integration/jdbc
15+
http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
16+
http://www.springframework.org/schema/context
17+
http://www.springframework.org/schema/context/spring-context.xsd">
18+
19+
<context:property-placeholder location="int-derby.properties"/>
20+
21+
<jdbc:embedded-database id="dataSource" type="DERBY"/>
22+
23+
<jdbc:initialize-database data-source="dataSource"
24+
ignore-failures="DROPS">
25+
<jdbc:script location="${int.drop.script}"/>
26+
<jdbc:script location="${int.schema.script}"/>
27+
</jdbc:initialize-database>
28+
29+
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
30+
31+
<beans:bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
32+
<beans:property name="dataSource" ref="dataSource"/>
33+
</beans:bean>
34+
35+
<aggregator input-channel="transactionalAggregatorInput" output-channel="transactionalAggregatorOutput"
36+
message-store="messageStore"
37+
send-partial-result-on-expiry="true"
38+
group-timeout="100">
39+
<expire-transactional/>
40+
</aggregator>
41+
42+
<service-activator input-channel="transactionalAggregatorOutput" ref="exceptionHandler"/>
43+
44+
<beans:bean id="exceptionHandler"
45+
class="org.springframework.integration.jdbc.AggregatorIntegrationTests$ExceptionMessageHandler"/>
46+
47+
</beans:beans>

0 commit comments

Comments
 (0)