Skip to content

Commit 2aadf5e

Browse files
committed
Fix receiveOnlyAdviceChain condition
https://build.spring.io/browse/INT-MASTER-415/ The `Stream` `Collectors.toList()` returns empty list if nothing pass the `.filter()`, therefore condition as `if (receiveOnlyAdviceChain != null)` is not enough and since `SourcePollingChannelAdapter.applyReceiveOnlyAdviceChain()` doesn't have conditions as well, the target `MessageSource` is proxyed for nothing. When `TransactionSynchronizationManager.getResource(this)` is called for the `MessageSource` it can't find it because the proxy doesn't match an original object. * Make condition as `if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain))` in the `AbstractPollingEndpoint` and `SourcePollingChannelAdapter` * Increase group removal wait timeout in the `gemfire.DelayerHandlerRescheduleIntegrationTests`
1 parent 5cca8e8 commit 2aadf5e

File tree

3 files changed

+18
-15
lines changed

3 files changed

+18
-15
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private Runnable createPoller() throws Exception {
193193
}
194194
pollingTask = (Callable<Boolean>) proxyFactory.getProxy(this.beanClassLoader);
195195
}
196-
if (receiveOnlyAdviceChain != null) {
196+
if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain)) {
197197
applyReceiveOnlyAdviceChain(receiveOnlyAdviceChain);
198198
}
199199
return new Poller(pollingTask);

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.springframework.messaging.MessageChannel;
3939
import org.springframework.messaging.MessagingException;
4040
import org.springframework.util.Assert;
41+
import org.springframework.util.CollectionUtils;
4142

4243
/**
4344
* A Channel Adapter implementation for connecting a
@@ -131,22 +132,24 @@ protected boolean isReceiveOnlyAdvice(Advice advice) {
131132

132133
@Override
133134
protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain) {
134-
if (AopUtils.isAopProxy(this.source)) {
135-
Advised source = (Advised) this.source;
136-
this.appliedAdvices.forEach(source::removeAdvice);
137-
for (Advice advice : chain) {
138-
source.addAdvisor(adviceToReceiveAdvisor(advice));
135+
if (!CollectionUtils.isEmpty(chain)) {
136+
if (AopUtils.isAopProxy(this.source)) {
137+
Advised source = (Advised) this.source;
138+
this.appliedAdvices.forEach(source::removeAdvice);
139+
for (Advice advice : chain) {
140+
source.addAdvisor(adviceToReceiveAdvisor(advice));
141+
}
139142
}
140-
}
141-
else {
142-
ProxyFactory proxyFactory = new ProxyFactory(this.source);
143-
for (Advice advice : chain) {
144-
proxyFactory.addAdvisor(adviceToReceiveAdvisor(advice));
143+
else {
144+
ProxyFactory proxyFactory = new ProxyFactory(this.source);
145+
for (Advice advice : chain) {
146+
proxyFactory.addAdvisor(adviceToReceiveAdvisor(advice));
147+
}
148+
this.source = (MessageSource<?>) proxyFactory.getProxy(getBeanClassLoader());
145149
}
146-
this.source = (MessageSource<?>) proxyFactory.getProxy(getBeanClassLoader());
150+
this.appliedAdvices.clear();
151+
this.appliedAdvices.addAll(chain);
147152
}
148-
this.appliedAdvices.clear();
149-
this.appliedAdvices.addAll(chain);
150153
}
151154

152155
private NameMatchMethodPointcutAdvisor adviceToReceiveAdvisor(Advice advice) {

spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/store/DelayerHandlerRescheduleIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void testDelayerHandlerRescheduleWithGemfireMessageStore() throws Excepti
140140
assertEquals(1, messageStore.getMessageGroupCount());
141141
int n = 0;
142142
while (n++ < 200 && messageStore.messageGroupSize(delayerMessageGroupId) > 0) {
143-
Thread.sleep(50);
143+
Thread.sleep(100);
144144
}
145145
assertEquals(0, messageStore.messageGroupSize(delayerMessageGroupId));
146146

0 commit comments

Comments
 (0)