Skip to content

Fix memory leak in the FluxMessageChannel #8622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

artembilan
Copy link
Member

The FluxMessageChannel can subscribe to any volatile Publisher. For example, we can call Reactor Kafka Sender.send() for input data and pass its result to the FluxMessageChannel for on demand subscription.
These publishers are subscribed in the FluxMessageChannel and their Disposable is stored in the internal Disposable.Composite which currently only cleared on destroy()

  • Extract Disposable from those internal subscribe() calls into an AtomicReference.
  • Use this AtomicReference in the doOnTerminate() to remove from the Disposable.Composite and dispose() when such a volatile Publisher is completed

Cherry-pick to 6.0.x & 5.5.x

The `FluxMessageChannel` can subscribe to any volatile `Publisher`.
For example, we can call Reactor Kafka `Sender.send()` for
input data and pass its result to the `FluxMessageChannel`
for on demand subscription.
These publishers are subscribed in the `FluxMessageChannel`
and their `Disposable` is stored in the internal `Disposable.Composite`
which currently only cleared on `destroy()`

* Extract `Disposable` from those internal `subscribe()` calls
into an `AtomicReference`.
* Use this `AtomicReference` in the `doOnTerminate()`
to remove from the `Disposable.Composite` and `dispose()`
when such a volatile `Publisher` is completed

**Cherry-pick to `6.0.x` & `5.5.x`**
@artembilan
Copy link
Member Author

Forgot to mention : it might not cherry-pick clean . So, let me know and I’ll figure out the change for those versions !

@garyrussell garyrussell merged commit 191f693 into spring-projects:main May 15, 2023
garyrussell pushed a commit that referenced this pull request May 15, 2023
The `FluxMessageChannel` can subscribe to any volatile `Publisher`.
For example, we can call Reactor Kafka `Sender.send()` for
input data and pass its result to the `FluxMessageChannel`
for on demand subscription.
These publishers are subscribed in the `FluxMessageChannel`
and their `Disposable` is stored in the internal `Disposable.Composite`
which currently only cleared on `destroy()`

* Extract `Disposable` from those internal `subscribe()` calls
into an `AtomicReference`.
* Use this `AtomicReference` in the `doOnTerminate()`
to remove from the `Disposable.Composite` and `dispose()`
when such a volatile `Publisher` is completed

**Cherry-pick to `6.0.x` & `5.5.x`**
@garyrussell
Copy link
Contributor

...and cherry-picked to 6.0.x as 485aa2a

Please back-port to 5.5.x - conflicts.

@artembilan artembilan deleted the Fix_memory_leak_in_FluxMessageChannel branch May 15, 2023 20:37
artembilan added a commit that referenced this pull request May 15, 2023
The `FluxMessageChannel` can subscribe to any volatile `Publisher`.
For example, we can call Reactor Kafka `Sender.send()` for
input data and pass its result to the `FluxMessageChannel`
for on demand subscription.
These publishers are subscribed in the `FluxMessageChannel`
and their `Disposable` is stored in the internal `Disposable.Composite`
which currently only cleared on `destroy()`

* Extract `Disposable` from those internal `subscribe()` calls
into an `AtomicReference`.
* Use this `AtomicReference` in the `doOnTerminate()`
to remove from the `Disposable.Composite` and `dispose()`
when such a volatile `Publisher` is completed

**Cherry-pick to `6.0.x` & `5.5.x`**
# Conflicts:
#	spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java
@artembilan
Copy link
Member Author

... and back-ported to 5.5.x as ccd3574 after fixing conflicts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants