Closed
Description
When we have a flow definition like this:
@Bean
IntegrationFlow fromPublisher() {
return IntegrationFlow.from(Flux.interval(Duration.ofSeconds(1)).map(GenericMessage::new))
.log(LoggingHandler.Level.WARN)
.channel(c -> c.queue("fromPublisherResult"))
.get();
}
Essentially, starting with a reactive Publisher
, such a flow does not stop, when we call applicationContext.stop();
.
This flow has a logic like:
static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {
FluxMessageChannel reactiveChannel = new FluxMessageChannel();
reactiveChannel.subscribeTo(publisher);
return from((MessageChannel) reactiveChannel);
}
And provided Publisher
is wrapped into a FluxMessageChannel
.
When the stop()
is initiated, all the integration endpoints are stopped, but not the reactive subscription in that FluxMessageChannel
.
We end up with the repetitive error like this (with the latest version 6.5
):
2025-04-30 10:21:25,123 WARN [parallel-4] [org.springframework.integration.channel.FluxMessageChannel] - Error during processing event: GenericMessage [payload=2, headers={id=8c134fb1-733d-42a0-5ddc-9f867546ee5e, timestamp=1746022885122}]
org.springframework.integration.MessageDispatchingException: The application context is not ready to dispatch messages. It has to be refreshed or started first. Also, messages must not be emitted from initialization phase, like 'afterPropertiesSet()', '@PostConstruct' or bean definition methods. Consider to use 'SmartLifecycle.start()' instead.
at org.springframework.integration.channel.AbstractMessageChannel.assertApplicationRunning(AbstractMessageChannel.java:370) ~[classes/:?]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:330) ~[classes/:?]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:312) ~[classes/:?]
at org.springframework.integration.channel.FluxMessageChannel.sendReactiveMessage(FluxMessageChannel.java:165) ~[classes/:?]
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$4(FluxMessageChannel.java:148)
...
org.springframework.integration.MessageDispatchingException: The application context is not ready to dispatch messages. It has to be refreshed or started first. Also, messages must not be emitted from initialization phase, like 'afterPropertiesSet()', '@PostConstruct' or bean definition methods. Consider to use 'SmartLifecycle.start()' instead.
at org.springframework.integration.channel.AbstractMessageChannel.assertApplicationRunning(AbstractMessageChannel.java:370) ~[classes/:?]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:330) ~[classes/:?]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:312) ~[classes/:?]
at org.springframework.integration.channel.FluxMessageChannel.sendReactiveMessage(FluxMessageChannel.java:165) ~[classes/:?]
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$4(FluxMessageChannel.java:148) ~[classes/:?]