Skip to content

Commit 5cca8e8

Browse files
artembilangaryrussell
authored andcommitted
INT-3770: Add TX Support from Mid-flow
JIRA: https://jira.spring.io/browse/INT-3770, https://jira.spring.io/browse/INT-4107 Having `TransactionHandleMessageAdvice` we can start TX from any `MessageHandler.handleMessage()` * Add `<transactional>` alongside with the `<request-handler-advice-chain>` for those components which produce reply * Merge `<transactional>` and `<request-handler-advice-chain>` configuration to a single `ManagedList` * Rework JPA `<transactional>` in favor of common solution * Some polishing and refactoring AbstractPollingEndpoint: avoid `new ArrayList` if we don't have `receiveOnlyAdvice`s
1 parent cfceca8 commit 5cca8e8

File tree

37 files changed

+327
-170
lines changed

37 files changed

+327
-170
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.beans.factory.support.AbstractBeanDefinition;
3030
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
3131
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
32+
import org.springframework.beans.factory.support.ManagedList;
3233
import org.springframework.beans.factory.support.ManagedSet;
3334
import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser;
3435
import org.springframework.beans.factory.xml.ParserContext;
@@ -86,11 +87,18 @@ protected final AbstractBeanDefinition parseInternal(Element element, ParserCont
8687
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(handlerBuilder, element, "output-channel");
8788
IntegrationNamespaceUtils.setValueIfAttributeDefined(handlerBuilder, element, "order");
8889

90+
Element txElement = DomUtils.getChildElementByTagName(element, "transactional");
8991
Element adviceChainElement = DomUtils.getChildElementByTagName(element,
9092
IntegrationNamespaceUtils.REQUEST_HANDLER_ADVICE_CHAIN);
91-
IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(adviceChainElement, null,
93+
94+
@SuppressWarnings("rawtypes")
95+
ManagedList adviceChain = IntegrationNamespaceUtils.configureAdviceChain(adviceChainElement, txElement, true,
9296
handlerBuilder.getRawBeanDefinition(), parserContext);
9397

98+
if (!CollectionUtils.isEmpty(adviceChain)) {
99+
handlerBuilder.addPropertyValue("adviceChain", adviceChain);
100+
}
101+
94102
AbstractBeanDefinition handlerBeanDefinition = handlerBuilder.getBeanDefinition();
95103
String inputChannelAttributeName = this.getInputChannelAttributeName();
96104
boolean hasInputChannelAttribute = element.hasAttribute(inputChannelAttributeName);
@@ -121,6 +129,10 @@ protected final AbstractBeanDefinition parseInternal(Element element, ParserCont
121129

122130
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ConsumerEndpointFactoryBean.class);
123131

132+
if (!CollectionUtils.isEmpty(adviceChain)) {
133+
builder.addPropertyValue("adviceChain", adviceChain);
134+
}
135+
124136
String handlerBeanName = BeanDefinitionReaderUtils.generateBeanName(handlerBeanDefinition, parserContext.getRegistry());
125137
String[] handlerAlias = IntegrationNamespaceUtils.generateAlias(element);
126138
parserContext.registerBeanComponent(new BeanComponentDefinition(handlerBeanDefinition, handlerBeanName, handlerAlias));

spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractOutboundChannelAdapterParser.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.beans.factory.xml.ParserContext;
2828
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
2929
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
30+
import org.springframework.util.CollectionUtils;
3031
import org.springframework.util.StringUtils;
3132
import org.springframework.util.xml.DomUtils;
3233

@@ -81,12 +82,14 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
8182

8283
private void configureRequestHandlerAdviceChain(Element element, ParserContext parserContext,
8384
BeanDefinition handlerBeanDefinition, BeanDefinitionBuilder consumerBuilder) {
85+
Element txElement = DomUtils.getChildElementByTagName(element, "transactional");
8486
Element adviceChainElement = DomUtils.getChildElementByTagName(element,
8587
IntegrationNamespaceUtils.REQUEST_HANDLER_ADVICE_CHAIN);
8688
@SuppressWarnings("rawtypes")
8789
ManagedList adviceChain =
88-
IntegrationNamespaceUtils.configureAdviceChain(adviceChainElement, null, handlerBeanDefinition, parserContext);
89-
if (adviceChain != null) {
90+
IntegrationNamespaceUtils.configureAdviceChain(adviceChainElement, txElement, handlerBeanDefinition,
91+
parserContext);
92+
if (!CollectionUtils.isEmpty(adviceChain)) {
9093
/*
9194
* For ARPMH, the advice chain is injected so just the handleRequestMessage method is advised.
9295
* Sometime ARPMHs do double duty as a gateway and a channel adapter. The parser subclass

spring-integration-core/src/main/java/org/springframework/integration/config/xml/DelayerParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ else if (expressionElement != null) {
101101
IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(adviceChainElement, txElement,
102102
builder.getRawBeanDefinition(), parserContext, "delayedAdviceChain");
103103

104+
if (txElement != null) {
105+
element.removeChild(txElement);
106+
}
107+
104108
return builder;
105109
}
106110

spring-integration-core/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@
4848
import org.springframework.integration.config.IntegrationConfigUtils;
4949
import org.springframework.integration.context.IntegrationContextUtils;
5050
import org.springframework.integration.endpoint.AbstractPollingEndpoint;
51+
import org.springframework.integration.transaction.TransactionHandleMessageAdvice;
5152
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
5253
import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource;
5354
import org.springframework.transaction.interceptor.TransactionInterceptor;
5455
import org.springframework.util.Assert;
56+
import org.springframework.util.CollectionUtils;
5557
import org.springframework.util.StringUtils;
5658
import org.springframework.util.xml.DomUtils;
5759

@@ -381,19 +383,34 @@ public static void configureHeaderMapper(Element element, BeanDefinitionBuilder
381383
* Parse a "transactional" element and configure a {@link TransactionInterceptor}
382384
* with "transactionManager" and other "transactionDefinition" properties.
383385
* For example, this advisor will be applied on the Polling Task proxy.
384-
*
385386
* @param txElement The transactional element.
386387
* @return The bean definition.
387-
*
388388
* @see AbstractPollingEndpoint
389389
*/
390390
public static BeanDefinition configureTransactionAttributes(Element txElement) {
391+
return configureTransactionAttributes(txElement, false);
392+
}
393+
394+
/**
395+
* Parse a "transactional" element and configure a {@link TransactionInterceptor}
396+
* or {@link TransactionHandleMessageAdvice}
397+
* with "transactionManager" and other "transactionDefinition" properties.
398+
* For example, this advisor will be applied on the Polling Task proxy.
399+
* @param txElement The transactional element.
400+
* @param handleMessageAdvice flag if to use {@link TransactionHandleMessageAdvice}
401+
* or regular {@link TransactionInterceptor}
402+
* @return The bean definition.
403+
* @see AbstractPollingEndpoint
404+
*/
405+
public static BeanDefinition configureTransactionAttributes(Element txElement, boolean handleMessageAdvice) {
391406
BeanDefinition txDefinition = configureTransactionDefinition(txElement);
392407
BeanDefinitionBuilder attributeSourceBuilder =
393408
BeanDefinitionBuilder.genericBeanDefinition(MatchAlwaysTransactionAttributeSource.class);
394409
attributeSourceBuilder.addPropertyValue("transactionAttribute", txDefinition);
395410
BeanDefinitionBuilder txInterceptorBuilder =
396-
BeanDefinitionBuilder.genericBeanDefinition(TransactionInterceptor.class);
411+
BeanDefinitionBuilder.genericBeanDefinition(handleMessageAdvice
412+
? TransactionHandleMessageAdvice.class
413+
: TransactionInterceptor.class);
397414
txInterceptorBuilder.addPropertyReference("transactionManager", txElement.getAttribute("transaction-manager"));
398415
txInterceptorBuilder.addPropertyValue("transactionAttributeSource", attributeSourceBuilder.getBeanDefinition());
399416
return txInterceptorBuilder.getBeanDefinition();
@@ -426,31 +443,47 @@ public static String[] generateAlias(Element element) {
426443

427444
public static void configureAndSetAdviceChainIfPresent(Element adviceChainElement, Element txElement,
428445
BeanDefinition parentBeanDefinition, ParserContext parserContext) {
429-
configureAndSetAdviceChainIfPresent(adviceChainElement, txElement, parentBeanDefinition, parserContext,
430-
"adviceChain");
446+
configureAndSetAdviceChainIfPresent(adviceChainElement, txElement, false, parentBeanDefinition, parserContext);
447+
}
448+
449+
public static void configureAndSetAdviceChainIfPresent(Element adviceChainElement,
450+
Element txElement, boolean handleMessageAdvice, BeanDefinition parentBeanDefinition,
451+
ParserContext parserContext) {
452+
configureAndSetAdviceChainIfPresent(adviceChainElement, txElement, handleMessageAdvice,
453+
parentBeanDefinition, parserContext, "adviceChain");
431454
}
432455

433-
@SuppressWarnings({ "rawtypes" })
434456
public static void configureAndSetAdviceChainIfPresent(Element adviceChainElement, Element txElement,
435457
BeanDefinition parentBeanDefinition, ParserContext parserContext, String propertyName) {
436-
ManagedList adviceChain = configureAdviceChain(adviceChainElement, txElement, parentBeanDefinition,
437-
parserContext);
438-
if (adviceChain != null) {
458+
configureAndSetAdviceChainIfPresent(adviceChainElement, txElement, false, parentBeanDefinition,
459+
parserContext, propertyName);
460+
}
461+
462+
@SuppressWarnings({ "rawtypes" })
463+
public static void configureAndSetAdviceChainIfPresent(Element adviceChainElement, Element txElement,
464+
boolean handleMessageAdvice, BeanDefinition parentBeanDefinition, ParserContext parserContext,
465+
String propertyName) {
466+
ManagedList adviceChain = configureAdviceChain(adviceChainElement, txElement, handleMessageAdvice,
467+
parentBeanDefinition, parserContext);
468+
if (!CollectionUtils.isEmpty(adviceChain)) {
439469
parentBeanDefinition.getPropertyValues().add(propertyName, adviceChain);
440470
}
441471
}
442472

443-
@SuppressWarnings({ "rawtypes", "unchecked" })
473+
@SuppressWarnings("rawtypes")
444474
public static ManagedList configureAdviceChain(Element adviceChainElement, Element txElement,
445475
BeanDefinition parentBeanDefinition, ParserContext parserContext) {
446-
ManagedList adviceChain = null;
447-
// Schema validation ensures txElement and adviceChainElement are mutually exclusive
476+
return configureAdviceChain(adviceChainElement, txElement, false, parentBeanDefinition, parserContext);
477+
}
478+
479+
@SuppressWarnings({ "rawtypes", "unchecked" })
480+
public static ManagedList configureAdviceChain(Element adviceChainElement, Element txElement,
481+
boolean handleMessageAdvice, BeanDefinition parentBeanDefinition, ParserContext parserContext) {
482+
ManagedList adviceChain = new ManagedList();
448483
if (txElement != null) {
449-
adviceChain = new ManagedList();
450-
adviceChain.add(IntegrationNamespaceUtils.configureTransactionAttributes(txElement));
484+
adviceChain.add(configureTransactionAttributes(txElement, handleMessageAdvice));
451485
}
452486
if (adviceChainElement != null) {
453-
adviceChain = new ManagedList();
454487
NodeList childNodes = adviceChainElement.getChildNodes();
455488
for (int i = 0; i < childNodes.getLength(); i++) {
456489
Node child = childNodes.item(i);

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19-
import java.util.ArrayList;
2019
import java.util.Collection;
2120
import java.util.List;
2221
import java.util.concurrent.Callable;
2322
import java.util.concurrent.Executor;
2423
import java.util.concurrent.ScheduledFuture;
24+
import java.util.stream.Collectors;
2525

2626
import org.aopalliance.aop.Advice;
2727

@@ -174,30 +174,26 @@ protected void onInit() {
174174

175175
@SuppressWarnings("unchecked")
176176
private Runnable createPoller() throws Exception {
177-
List<Advice> receiveOnlyAdviceChain = new ArrayList<Advice>();
177+
List<Advice> receiveOnlyAdviceChain = null;
178178
if (!CollectionUtils.isEmpty(this.adviceChain)) {
179-
for (Advice advice : this.adviceChain) {
180-
if (isReceiveOnlyAdvice(advice)) {
181-
receiveOnlyAdviceChain.add(advice);
182-
}
183-
}
179+
receiveOnlyAdviceChain = this.adviceChain.stream()
180+
.filter(this::isReceiveOnlyAdvice)
181+
.collect(Collectors.toList());
184182
}
185183

186-
Callable<Boolean> pollingTask = () -> doPoll();
184+
Callable<Boolean> pollingTask = this::doPoll;
187185

188186
List<Advice> adviceChain = this.adviceChain;
189187
if (!CollectionUtils.isEmpty(adviceChain)) {
190188
ProxyFactory proxyFactory = new ProxyFactory(pollingTask);
191189
if (!CollectionUtils.isEmpty(adviceChain)) {
192-
for (Advice advice : adviceChain) {
193-
if (!isReceiveOnlyAdvice(advice)) {
194-
proxyFactory.addAdvice(advice);
195-
}
196-
}
190+
adviceChain.stream()
191+
.filter(advice -> !isReceiveOnlyAdvice(advice))
192+
.forEach(proxyFactory::addAdvice);
197193
}
198194
pollingTask = (Callable<Boolean>) proxyFactory.getProxy(this.beanClassLoader);
199195
}
200-
if (receiveOnlyAdviceChain.size() > 0) {
196+
if (receiveOnlyAdviceChain != null) {
201197
applyReceiveOnlyAdviceChain(receiveOnlyAdviceChain);
202198
}
203199
return new Poller(pollingTask);

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,10 @@ protected boolean isReceiveOnlyAdvice(Advice advice) {
132132
@Override
133133
protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain) {
134134
if (AopUtils.isAopProxy(this.source)) {
135-
this.appliedAdvices.forEach(((Advised) this.source)::removeAdvice);
135+
Advised source = (Advised) this.source;
136+
this.appliedAdvices.forEach(source::removeAdvice);
136137
for (Advice advice : chain) {
137-
((Advised) this.source).addAdvisor(adviceToReceiveAdvisor(advice));
138+
source.addAdvisor(adviceToReceiveAdvisor(advice));
138139
}
139140
}
140141
else {

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration-5.0.xsd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,6 +1367,7 @@
13671367
</xsd:complexContent>
13681368
</xsd:complexType>
13691369
</xsd:element>
1370+
<xsd:element name="transactional" type="transactionalType" minOccurs="0" maxOccurs="1" />
13701371
<xsd:element name="request-handler-advice-chain" type="handlerAdviceChainType" minOccurs="0" maxOccurs="1" />
13711372
</xsd:sequence>
13721373
<xsd:attribute name="request-channel" type="xsd:string" use="optional">
@@ -1665,6 +1666,7 @@
16651666
</xsd:annotation>
16661667
<xsd:complexType>
16671668
<xsd:choice minOccurs="0" maxOccurs="2">
1669+
<xsd:element name="transactional" type="transactionalType" minOccurs="0" maxOccurs="1" />
16681670
<xsd:element name="request-handler-advice-chain" type="handlerAdviceChainType" minOccurs="0" maxOccurs="1" />
16691671
<xsd:element ref="poller" />
16701672
</xsd:choice>
@@ -2871,6 +2873,7 @@
28712873
<xsd:complexContent>
28722874
<xsd:extension base="expressionOrInnerEndpointDefinitionAwareNoAdviceChain">
28732875
<xsd:sequence>
2876+
<xsd:element name="transactional" type="transactionalType" minOccurs="0" maxOccurs="1" />
28742877
<xsd:element name="request-handler-advice-chain" minOccurs="0" maxOccurs="1">
28752878
<xsd:complexType>
28762879
<xsd:complexContent>
@@ -4125,6 +4128,7 @@
41254128
<xsd:choice minOccurs="0" maxOccurs="3">
41264129
<xsd:element name="poller" type="basePollerType" minOccurs="0" maxOccurs="1" />
41274130
<xsd:element name="expression" type="innerExpressionType" minOccurs="0" maxOccurs="1" />
4131+
<xsd:element name="transactional" type="transactionalType" minOccurs="0" maxOccurs="1" />
41284132
<xsd:element name="request-handler-advice-chain" type="handlerAdviceChainType" minOccurs="0" maxOccurs="1" />
41294133
<xsd:any namespace="##other" processContents="strict" minOccurs="0" maxOccurs="1" />
41304134
</xsd:choice>

spring-integration-core/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml renamed to spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests-context.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727
input-channel="aggregatorWithCustomMGPReferenceInput" output-channel="outputChannel"/>
2828

2929
<channel id="completelyDefinedAggregatorInput"/>
30+
31+
<beans:bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager"/>
32+
3033
<aggregator id="completelyDefinedAggregator"
3134
input-channel="completelyDefinedAggregatorInput"
3235
output-channel="outputChannel"
@@ -44,7 +47,7 @@
4447
scheduler="scheduler"
4548
message-store="store"
4649
order="5">
47-
<expire-advice-chain/>
50+
<expire-transactional/>
4851
</aggregator>
4952

5053
<beans:bean id="lockRegistry" class="org.springframework.integration.support.locks.DefaultLockRegistry"/>
@@ -116,4 +119,5 @@
116119
class="org.springframework.integration.config.MaxValueReleaseStrategy">
117120
<beans:constructor-arg value="10" />
118121
</beans:bean>
122+
119123
</beans:beans>

spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@
3434
import java.util.concurrent.atomic.AtomicReference;
3535

3636
import org.junit.Assert;
37-
import org.junit.Before;
3837
import org.junit.Test;
38+
import org.junit.runner.RunWith;
3939

4040
import org.springframework.beans.DirectFieldAccessor;
4141
import org.springframework.beans.factory.BeanCreationException;
42+
import org.springframework.beans.factory.annotation.Autowired;
4243
import org.springframework.beans.factory.parsing.BeanDefinitionParsingException;
4344
import org.springframework.context.ApplicationContext;
4445
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -63,6 +64,7 @@
6364
import org.springframework.messaging.MessageChannel;
6465
import org.springframework.messaging.PollableChannel;
6566
import org.springframework.messaging.SubscribableChannel;
67+
import org.springframework.test.context.junit4.SpringRunner;
6668

6769
/**
6870
* @author Marius Bogoevici
@@ -73,15 +75,12 @@
7375
* @author Gunnar Hillert
7476
* @author Gary Russell
7577
*/
78+
@RunWith(SpringRunner.class)
7679
public class AggregatorParserTests {
7780

81+
@Autowired
7882
private ApplicationContext context;
7983

80-
@Before
81-
public void setUp() {
82-
this.context = new ClassPathXmlApplicationContext("aggregatorParserTests.xml", this.getClass());
83-
}
84-
8584
@Test
8685
public void testAggregation() {
8786
MessageChannel input = (MessageChannel) context.getBean("aggregatorWithReferenceInput");
@@ -90,11 +89,11 @@ public void testAggregation() {
9089
outboundMessages.add(createMessage("123", "id1", 3, 1, null));
9190
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
9291
outboundMessages.add(createMessage("456", "id1", 3, 2, null));
93-
for (Message<?> message : outboundMessages) {
94-
input.send(message);
95-
}
96-
assertEquals("One and only one message must have been aggregated", 1, aggregatorBean.getAggregatedMessages()
97-
.size());
92+
93+
outboundMessages.forEach(input::send);
94+
95+
assertEquals("One and only one message must have been aggregated", 1,
96+
aggregatorBean.getAggregatedMessages().size());
9897
Message<?> aggregatedMessage = aggregatorBean.getAggregatedMessages().get("id1");
9998
assertEquals("The aggregated message payload is not correct", "123456789", aggregatedMessage.getPayload());
10099
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
@@ -111,9 +110,9 @@ public void testAggregationWithMessageGroupProcessor() {
111110
outboundMessages.add(createMessage("123", "id1", 3, 1, null));
112111
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
113112
outboundMessages.add(createMessage("456", "id1", 3, 2, null));
114-
for (Message<?> message : outboundMessages) {
115-
input.send(message);
116-
}
113+
114+
outboundMessages.forEach(input::send);
115+
117116
assertEquals(3, output.getQueueSize());
118117
output.purge(null);
119118
}
@@ -127,7 +126,9 @@ public void testAggregationWithMessageGroupProcessorAndStrategies() {
127126
outboundMessages.add(createMessage("123", "id1", 3, 1, null));
128127
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
129128
outboundMessages.add(createMessage("456", "id1", 3, 2, null));
129+
130130
outboundMessages.forEach(input::send);
131+
131132
assertEquals(3, output.getQueueSize());
132133
output.purge(null);
133134
}
@@ -142,7 +143,9 @@ public void testAggregationByExpression() {
142143
outboundMessages.add(MessageBuilder.withPayload("123").setHeader("foo", "1").build());
143144
outboundMessages.add(MessageBuilder.withPayload("456").setHeader("foo", "1").build());
144145
outboundMessages.add(MessageBuilder.withPayload("789").setHeader("foo", "1").build());
146+
145147
outboundMessages.forEach(input::send);
148+
146149
assertEquals("The aggregated message payload is not correct", "[123]", aggregatedMessage.get().getPayload()
147150
.toString());
148151
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);

0 commit comments

Comments
 (0)