Skip to content

Propagate and Observation from Reactive context #3999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 1, 2023

Conversation

artembilan
Copy link
Member

To propagate an Observation from reactive stream (e.g. WebFlux) we have to capture its context and set it into the current thread scope.

  • Add a io.micrometer:context-propagation dependency to support reactive context propagation
  • Populate a parentObservation in the IntegrationObservation.PRODUCER.observation() since it is not available for tracing on Observation.onStart(). Might be tentative until upcoming fix in Micrometer Observation
  • Populate from reactive context in the WebFluxInboundEndpoint where we use just send() operation downstream
  • Populate from reactive context in the MessagingGatewaySupport where we use send() operation downstream or FluxMessageChannel.subscribeTo()
  • Use contextCapture() in the FluxMessageChannel to gather a ThreadLocal info into a Reactor context and then set back to ThreadLocal in the transformDeferredContextual() which really happens on a different thread
  • Verify a trace propagation from WebFlux to an integration flow via Brave instrumentation in the WebFluxObservationPropagationTests

To propagate an `Observation` from reactive stream (e.g. WebFlux)
we have to capture its context and set it into the current thread scope.

* Add a `io.micrometer:context-propagation` dependency to support reactive context propagation
* Populate a `parentObservation` in the `IntegrationObservation.PRODUCER.observation()`
since it is not available for tracing on `Observation.onStart()`.
Might be tentative until upcoming fix in Micrometer Observation
* Populate from reactive context in the `WebFluxInboundEndpoint` where we use just `send()` operation downstream
* Populate from reactive context in the `MessagingGatewaySupport` where we use `send()` operation downstream or `FluxMessageChannel.subscribeTo()`
* Use `contextCapture()` in the `FluxMessageChannel` to gather a `ThreadLocal` info into a Reactor context
and then set back to `ThreadLocal` in the `transformDeferredContextual()` which really happens on a different thread
* Verify a trace propagation from WebFlux to an integration flow via Brave instrumentation in the `WebFluxObservationPropagationTests`
.flatMap(replyMessage -> populateResponse(exchange, replyMessage));
}
else {
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not required in the send/receive case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is covered by the MessagingGatewaySupport change in this PR.
We need such an action only when we lose a reactive scope.
This send() returns nothing, but sendAndReceiveMessageReactive() returns a Mono where respective context is still present. Therefore we have to repeat a set to ThreadLocal over there where we lose the reactive stream context again.
That's why I have those separate tests in this PR.

NOTE: The RSocket support will work as well just because we have a fix in the MessagingGatewaySupport and it does not use a plain send().

@garyrussell garyrussell merged commit d9d7c49 into spring-projects:main Feb 1, 2023
@artembilan artembilan deleted the reactive_observation branch February 1, 2023 19:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants