Skip to content

Commit 7ad8f50

Browse files
artembilangaryrussell
authored andcommitted
INT-4163: UnProxy MessageSource for TX Resource
JIRA: https://jira.spring.io/browse/INT-4163 When `MessageSource` is proxy, the `TransactionSynchronizationManager.getResource(this)` logic in the `MessageSource` doesn't work, because TX resource is bound to the `Proxy` in the `SourcePollingChannelAdapter` * Introduce `SourcePollingChannelAdapter.originalSource` property and store there a target `MessageSource` object extracted from the AOP Proxy * Use `originalSource` as a resource to bind to the TX * Modify `MongoDbInboundChannelAdapterIntegrationTests` to ensure that `AbstractMessageSourceAdvice` proxying the `MessageSource` doesn't effect `TransactionSynchronizationManager.getResource(this)` logic * Refactor for some MongoDb test to rely on the `@RunWith(SpringJUnit4ClassRunner.class)` for context loading for better test class performance **Cherry-pick to 4.3.x, 4.2.x** Fallback to provided source if `target` from Proxy is `null` Fix [UnusedImport] issue
1 parent 12ef5cf commit 7ad8f50

File tree

6 files changed

+296
-210
lines changed

6 files changed

+296
-210
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.aop.framework.ProxyFactory;
2626
import org.springframework.aop.support.AopUtils;
2727
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
28+
import org.springframework.beans.factory.BeanCreationException;
2829
import org.springframework.context.Lifecycle;
2930
import org.springframework.integration.aop.AbstractMessageSourceAdvice;
3031
import org.springframework.integration.context.ExpressionCapable;
@@ -56,6 +57,8 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint
5657

5758
private final Collection<Advice> appliedAdvices = new HashSet<>();
5859

60+
private volatile MessageSource<?> originalSource;
61+
5962
private volatile MessageSource<?> source;
6063

6164
private volatile MessageChannel outputChannel;
@@ -71,6 +74,10 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint
7174
*/
7275
public void setSource(MessageSource<?> source) {
7376
this.source = source;
77+
78+
Object target = extractProxyTarget(source);
79+
this.originalSource = target != null ? (MessageSource<?>) target : source;
80+
7481
if (source instanceof ExpressionCapable) {
7582
setPrimaryExpression(((ExpressionCapable) source).getExpression());
7683
}
@@ -225,12 +232,29 @@ protected Message<?> receiveMessage() {
225232

226233
@Override
227234
protected Object getResourceToBind() {
228-
return this.source;
235+
return this.originalSource;
229236
}
230237

231238
@Override
232239
protected String getResourceKey() {
233240
return IntegrationResourceHolder.MESSAGE_SOURCE;
234241
}
235242

243+
private static Object extractProxyTarget(Object target) {
244+
if (!(target instanceof Advised)) {
245+
return target;
246+
}
247+
Advised advised = (Advised) target;
248+
if (advised.getTargetSource() == null) {
249+
return null;
250+
}
251+
try {
252+
return extractProxyTarget(advised.getTargetSource().getTarget());
253+
}
254+
catch (Exception e) {
255+
throw new BeanCreationException("Could not extract target", e);
256+
}
257+
}
258+
259+
236260
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<beans xmlns="http://www.springframework.org/schema/beans"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:int="http://www.springframework.org/schema/integration"
5+
xmlns:int-mongodb="http://www.springframework.org/schema/integration/mongodb"
6+
xmlns:mongo="http://www.springframework.org/schema/data/mongo"
7+
xmlns:tx="http://www.springframework.org/schema/tx"
8+
xsi:schemaLocation="http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo.xsd
9+
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
10+
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
11+
http://www.springframework.org/schema/integration/mongodb http://www.springframework.org/schema/integration/mongodb/spring-integration-mongodb.xsd
12+
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
13+
14+
<mongo:db-factory id="mongoDbFactory" dbname="test" />
15+
16+
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
17+
channel="replyChannel"
18+
query="{'name' : 'Bob'}"
19+
entity-class="java.lang.Object"
20+
auto-startup="false">
21+
<int:poller fixed-rate="100" />
22+
</int-mongodb:inbound-channel-adapter>
23+
24+
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapterNamedFactory"
25+
mongodb-factory="mongoDbFactory"
26+
channel="replyChannel"
27+
query="{'name' : 'Bob'}"
28+
auto-startup="false">
29+
<int:poller fixed-rate="5000" />
30+
</int-mongodb:inbound-channel-adapter>
31+
32+
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapterWithTemplate"
33+
channel="replyChannel"
34+
mongo-template="mongoDbTemplate"
35+
query="{'name' : 'Bob'}"
36+
expect-single-result="true"
37+
entity-class="org.springframework.integration.mongodb.rules.MongoDbAvailableTests.Person"
38+
auto-startup="false">
39+
<int:poller fixed-rate="5000" />
40+
</int-mongodb:inbound-channel-adapter>
41+
42+
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapterWithNamedCollection"
43+
channel="replyChannel"
44+
collection-name="foo"
45+
mongo-template="mongoDbTemplate"
46+
query="{'name' : 'Bob'}"
47+
entity-class="java.lang.Object"
48+
auto-startup="false">
49+
<int:poller fixed-rate="5000" />
50+
</int-mongodb:inbound-channel-adapter>
51+
52+
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapterWithNamedCollectionExpression"
53+
channel="replyChannel"
54+
collection-name-expression="'foo'"
55+
mongo-template="mongoDbTemplate"
56+
query="{'name' : 'Bob'}"
57+
entity-class="java.lang.Object"
58+
auto-startup="false">
59+
<int:poller fixed-rate="5000" />
60+
</int-mongodb:inbound-channel-adapter>
61+
62+
<int-mongodb:inbound-channel-adapter id="inboundAdapterWithOnSuccessDisposition"
63+
channel="replyChannel"
64+
query="{'name' : 'Bob'}"
65+
auto-startup="false">
66+
67+
<int:poller fixed-rate="200" max-messages-per-poll="1">
68+
<int:advice-chain synchronization-factory="syncFactory">
69+
<bean
70+
class="org.springframework.integration.mongodb.config.MongoDbInboundChannelAdapterIntegrationTests.TestMessageSourceAdvice" />
71+
<tx:advice>
72+
<tx:attributes>
73+
<tx:method name="*" />
74+
</tx:attributes>
75+
</tx:advice>
76+
</int:advice-chain>
77+
</int:poller>
78+
</int-mongodb:inbound-channel-adapter>
79+
80+
<int:transaction-synchronization-factory id="syncFactory">
81+
<int:after-commit expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)" />
82+
</int:transaction-synchronization-factory>
83+
84+
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapterWithConverter"
85+
channel="replyChannel"
86+
query="{'name' : 'Bob'}"
87+
entity-class="java.lang.Object"
88+
mongo-converter="mongoConverter"
89+
auto-startup="false">
90+
<int:poller fixed-rate="100" />
91+
</int-mongodb:inbound-channel-adapter>
92+
93+
<bean id="documentCleaner"
94+
class="org.springframework.integration.mongodb.config.MongoDbInboundChannelAdapterIntegrationTests.DocumentCleaner" />
95+
96+
<bean id="mongoDbTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
97+
<constructor-arg ref="mongoDbFactory" />
98+
</bean>
99+
100+
<bean id="mongoConverter"
101+
class="org.springframework.integration.mongodb.rules.MongoDbAvailableTests.TestMongoConverter">
102+
<constructor-arg ref="mongoDbFactory" />
103+
<constructor-arg>
104+
<bean class="org.springframework.data.mongodb.core.mapping.MongoMappingContext" />
105+
</constructor-arg>
106+
</bean>
107+
108+
<int:channel id="replyChannel">
109+
<int:queue />
110+
</int:channel>
111+
112+
<bean id="transactionManager" class="org.springframework.integration.transaction.PseudoTransactionManager" />
113+
114+
</beans>

0 commit comments

Comments
 (0)