Skip to content

The IntegrationFlow.from(Publisher<Message<?>>) never stops #9999

Closed
@artembilan

Description

@artembilan

When we have a flow definition like this:

		@Bean
		IntegrationFlow fromPublisher() {
			return IntegrationFlow.from(Flux.interval(Duration.ofSeconds(1)).map(GenericMessage::new))
					.log(LoggingHandler.Level.WARN)
					.channel(c -> c.queue("fromPublisherResult"))
					.get();
		}

Essentially, starting with a reactive Publisher, such a flow does not stop, when we call applicationContext.stop();.

This flow has a logic like:

	static IntegrationFlowBuilder from(Publisher<? extends Message<?>> publisher) {
		FluxMessageChannel reactiveChannel = new FluxMessageChannel();
		reactiveChannel.subscribeTo(publisher);
		return from((MessageChannel) reactiveChannel);
	}

And provided Publisher is wrapped into a FluxMessageChannel.
When the stop() is initiated, all the integration endpoints are stopped, but not the reactive subscription in that FluxMessageChannel.
We end up with the repetitive error like this (with the latest version 6.5):

2025-04-30 10:21:25,123 WARN [parallel-4] [org.springframework.integration.channel.FluxMessageChannel] - Error during processing event: GenericMessage [payload=2, headers={id=8c134fb1-733d-42a0-5ddc-9f867546ee5e, timestamp=1746022885122}]
org.springframework.integration.MessageDispatchingException: The application context is not ready to dispatch messages. It has to be refreshed or started first. Also, messages must not be emitted from initialization phase, like 'afterPropertiesSet()', '@PostConstruct' or bean definition methods. Consider to use 'SmartLifecycle.start()' instead.
	at org.springframework.integration.channel.AbstractMessageChannel.assertApplicationRunning(AbstractMessageChannel.java:370) ~[classes/:?]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:330) ~[classes/:?]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:312) ~[classes/:?]
	at org.springframework.integration.channel.FluxMessageChannel.sendReactiveMessage(FluxMessageChannel.java:165) ~[classes/:?]
	at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$4(FluxMessageChannel.java:148)

 ...


org.springframework.integration.MessageDispatchingException: The application context is not ready to dispatch messages. It has to be refreshed or started first. Also, messages must not be emitted from initialization phase, like 'afterPropertiesSet()', '@PostConstruct' or bean definition methods. Consider to use 'SmartLifecycle.start()' instead.
	at org.springframework.integration.channel.AbstractMessageChannel.assertApplicationRunning(AbstractMessageChannel.java:370) ~[classes/:?]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:330) ~[classes/:?]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:312) ~[classes/:?]
	at org.springframework.integration.channel.FluxMessageChannel.sendReactiveMessage(FluxMessageChannel.java:165) ~[classes/:?]
	at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$4(FluxMessageChannel.java:148) ~[classes/:?]

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions