Skip to content

Commit 05ee9ff

Browse files
committed
GH-9999: Fix FluxMessageChannel for restarted application
Fixes: #9999 Issue link: #9999 After the application context stop, the `FluxMessageChannel` continues to try to emit messages. Just because subscriptions to the provided publishers are not cancelled. More over, we would like to come back to the production from those publishers when we start application back. * Fix `FluxMessageChannel` implementing a `Lifecycle` contract. Gather provided publisher in a local cache to come back to them when we call `start()`, essentially, initiating a new subscription to those provided publishers **Auto-cherry-pick to `6.4.x` & `6.3.x`**
1 parent 119209d commit 05ee9ff

File tree

3 files changed

+94
-14
lines changed

3 files changed

+94
-14
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,10 @@ else if (this.metricsCaptor != null) {
344344
}
345345
}
346346

347+
protected boolean isApplicationRunning() {
348+
return this.applicationRunning;
349+
}
350+
347351
private void assertApplicationRunning(Message<?> message) {
348352
if (!this.applicationRunning) {
349353
ApplicationContext applicationContext = getApplicationContext();

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.integration.channel;
1818

1919
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.List;
2022
import java.util.concurrent.TimeUnit;
2123
import java.util.concurrent.atomic.AtomicReference;
2224
import java.util.concurrent.locks.LockSupport;
@@ -30,6 +32,7 @@
3032
import reactor.core.publisher.Sinks;
3133
import reactor.util.context.ContextView;
3234

35+
import org.springframework.context.Lifecycle;
3336
import org.springframework.core.log.LogMessage;
3437
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3538
import org.springframework.integration.StaticMessageHeaderAccessor;
@@ -38,10 +41,14 @@
3841
import org.springframework.messaging.Message;
3942
import org.springframework.messaging.MessageDeliveryException;
4043
import org.springframework.util.Assert;
44+
import org.springframework.util.ReflectionUtils;
4145

4246
/**
4347
* The {@link AbstractMessageChannel} implementation for the
4448
* Reactive Streams {@link Publisher} based on the Project Reactor {@link Flux}.
49+
* <p>
50+
* This class implements {@link Lifecycle} to control subscriptions to publishers
51+
* attached via {@link #subscribeTo(Publisher)}, when this channel is restarted.
4552
*
4653
* @author Artem Bilan
4754
* @author Gary Russell
@@ -50,11 +57,13 @@
5057
* @since 5.0
5158
*/
5259
public class FluxMessageChannel extends AbstractMessageChannel
53-
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
60+
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel, Lifecycle {
5461

5562
private final Sinks.Many<Message<?>> sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
5663

57-
private final Disposable.Composite upstreamSubscriptions = Disposables.composite();
64+
private final List<Publisher<? extends Message<?>>> sourcePublishers = new ArrayList<>();
65+
66+
private volatile Disposable.Composite upstreamSubscriptions = Disposables.composite();
5867

5968
private volatile boolean active = true;
6069

@@ -111,19 +120,22 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
111120
.subscribe(subscriber);
112121
}
113122

114-
private void addPublisherToSubscribe(Flux<?> publisher) {
115-
AtomicReference<Disposable> disposableReference = new AtomicReference<>();
123+
@Override
124+
public void start() {
125+
this.active = true;
126+
this.upstreamSubscriptions = Disposables.composite();
127+
this.sourcePublishers.forEach(this::doSubscribeTo);
128+
}
116129

117-
Disposable disposable =
118-
publisher
119-
.doOnTerminate(() -> disposeUpstreamSubscription(disposableReference))
120-
.subscribe();
130+
@Override
131+
public void stop() {
132+
this.active = false;
133+
this.upstreamSubscriptions.dispose();
134+
}
121135

122-
if (!disposable.isDisposed()) {
123-
if (this.upstreamSubscriptions.add(disposable)) {
124-
disposableReference.set(disposable);
125-
}
126-
}
136+
@Override
137+
public boolean isRunning() {
138+
return this.active;
127139
}
128140

129141
private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableReference) {
@@ -136,8 +148,14 @@ private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableR
136148

137149
@Override
138150
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
151+
this.sourcePublishers.add(publisher);
152+
doSubscribeTo(publisher);
153+
}
154+
155+
private void doSubscribeTo(Publisher<? extends Message<?>> publisher) {
139156
Flux<Object> upstreamPublisher =
140157
Flux.from(publisher)
158+
.doOnComplete(() -> this.sourcePublishers.remove(publisher))
141159
.delaySubscription(
142160
Mono.fromCallable(this.sink::currentSubscriberCount)
143161
.filter((value) -> value > 0)
@@ -152,6 +170,21 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
152170
addPublisherToSubscribe(upstreamPublisher);
153171
}
154172

173+
private void addPublisherToSubscribe(Flux<?> publisher) {
174+
AtomicReference<Disposable> disposableReference = new AtomicReference<>();
175+
176+
Disposable disposable =
177+
publisher
178+
.doOnTerminate(() -> disposeUpstreamSubscription(disposableReference))
179+
.subscribe();
180+
181+
if (!disposable.isDisposed()) {
182+
if (this.upstreamSubscriptions.add(disposable)) {
183+
disposableReference.set(disposable);
184+
}
185+
}
186+
}
187+
155188
private void sendReactiveMessage(Message<?> message) {
156189
Message<?> messageToSend = message;
157190
// We have just restored Reactor context, so no need in a header anymore.
@@ -169,14 +202,20 @@ private void sendReactiveMessage(Message<?> message) {
169202
}
170203
}
171204
catch (Exception ex) {
172-
logger.warn(ex, LogMessage.format("Error during processing event: %s", messageToSend));
205+
if (isApplicationRunning()) {
206+
logger.error(ex, LogMessage.format("Error during processing event: %s", messageToSend));
207+
}
208+
else {
209+
ReflectionUtils.rethrowRuntimeException(ex);
210+
}
173211
}
174212
}
175213

176214
@Override
177215
public void destroy() {
178216
this.active = false;
179217
this.upstreamSubscriptions.dispose();
218+
this.sourcePublishers.clear();
180219
this.sink.emitComplete(Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1)));
181220
super.destroy();
182221
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import org.springframework.beans.factory.annotation.Autowired;
4040
import org.springframework.beans.factory.annotation.Qualifier;
41+
import org.springframework.context.ConfigurableApplicationContext;
4142
import org.springframework.context.Lifecycle;
4243
import org.springframework.context.annotation.Bean;
4344
import org.springframework.context.annotation.Configuration;
@@ -266,6 +267,35 @@ void messageProducerIsNotStartedAutomatically() {
266267
.verify(Duration.ofSeconds(10));
267268
}
268269

270+
@Autowired
271+
QueueChannel fromPublisherResult;
272+
273+
@Autowired
274+
ConfigurableApplicationContext applicationContext;
275+
276+
@Test
277+
@DirtiesContext
278+
// Use disruptive this.applicationContext.start()
279+
void verifyFluxMessageChannelRestart() {
280+
for (long i = 0; i < 3L; i++) {
281+
assertThat(this.fromPublisherResult.receive(10_000)).extracting(Message::getPayload).isEqualTo(i);
282+
}
283+
284+
this.applicationContext.stop();
285+
286+
this.fromPublisherResult.purge(null);
287+
288+
this.applicationContext.start();
289+
290+
// The applicationContext restart causes all the endpoint to be started,
291+
// while we really don't have a subscription to this producer
292+
this.testMessageProducer.stop();
293+
294+
for (long i = 0; i < 3L; i++) {
295+
assertThat(this.fromPublisherResult.receive(10_000)).extracting(Message::getPayload).isEqualTo(i);
296+
}
297+
}
298+
269299
@Configuration
270300
@EnableIntegration
271301
public static class ContextConfiguration {
@@ -325,6 +355,13 @@ public Publisher<Message<String>> messageProducerFlow() {
325355
.toReactivePublisher(true);
326356
}
327357

358+
@Bean
359+
IntegrationFlow fromPublisher() {
360+
return IntegrationFlow.from(Flux.interval(Duration.ofMillis(100)).map(GenericMessage::new))
361+
.channel(c -> c.queue("fromPublisherResult"))
362+
.get();
363+
}
364+
328365
}
329366

330367
private static class TestMessageProducerSpec

0 commit comments

Comments
 (0)