Skip to content

Commit d9d7c49

Browse files
authored
Propagate and Observation from Reactive context (#3999)
To propagate an `Observation` from reactive stream (e.g. WebFlux) we have to capture its context and set it into the current thread scope. * Add a `io.micrometer:context-propagation` dependency to support reactive context propagation * Populate a `parentObservation` in the `IntegrationObservation.PRODUCER.observation()` since it is not available for tracing on `Observation.onStart()`. Might be tentative until upcoming fix in Micrometer Observation * Populate from reactive context in the `WebFluxInboundEndpoint` where we use just `send()` operation downstream * Populate from reactive context in the `MessagingGatewaySupport` where we use `send()` operation downstream or `FluxMessageChannel.subscribeTo()` * Use `contextCapture()` in the `FluxMessageChannel` to gather a `ThreadLocal` info into a Reactor context and then set back to `ThreadLocal` in the `transformDeferredContextual()` which really happens on a different thread * Verify a trace propagation from WebFlux to an integration flow via Brave instrumentation in the `WebFluxObservationPropagationTests`
1 parent 8412890 commit d9d7c49

File tree

6 files changed

+264
-27
lines changed

6 files changed

+264
-27
lines changed

build.gradle

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ ext {
8787
lettuceVersion = '6.2.2.RELEASE'
8888
log4jVersion = '2.19.0'
8989
mailVersion = '1.0.0'
90-
micrometerVersion = '1.10.3'
91-
micrometerTracingVersion = '1.0.1'
90+
micrometerPropagationVersion = '1.0.1-SNAPSHOT'
91+
micrometerTracingVersion = '1.0.2-SNAPSHOT'
92+
micrometerVersion = '1.10.4-SNAPSHOT'
9293
mockitoVersion = '4.10.0'
9394
mongoDriverVersion = '4.8.2'
9495
mysqlVersion = '8.0.31'
@@ -532,6 +533,7 @@ project('spring-integration-core') {
532533
}
533534
api 'io.projectreactor:reactor-core'
534535
api 'io.micrometer:micrometer-observation'
536+
api "io.micrometer:context-propagation:$micrometerPropagationVersion"
535537

536538
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
537539
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
@@ -992,6 +994,12 @@ project('spring-integration-webflux') {
992994
exclude group: 'org.springframework'
993995
}
994996
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
997+
testImplementation 'io.micrometer:micrometer-observation-test'
998+
testImplementation ('io.micrometer:micrometer-tracing-integration-test') {
999+
exclude group: 'io.opentelemetry'
1000+
exclude group: 'com.wavefront'
1001+
exclude group: 'io.micrometer', module: 'micrometer-tracing-bridge-otel'
1002+
}
9951003

9961004
testRuntimeOnly "com.jayway.jsonpath:json-path:$jsonpathVersion"
9971005
}

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -335,6 +335,7 @@ private boolean sendWithObservation(Message<?> message, long timeout) {
335335
DefaultMessageSenderObservationConvention.INSTANCE,
336336
() -> new MessageSenderContext(messageToSend, getComponentName()),
337337
this.observationRegistry)
338+
.parentObservation(this.observationRegistry.getCurrentObservation()) // TODO until the fix in micrometer-observation
338339
.observe(() -> sendInternal(messageToSend, timeout));
339340
}
340341

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.TimeUnit;
2121
import java.util.concurrent.locks.LockSupport;
2222

23+
import io.micrometer.context.ContextSnapshot;
2324
import org.reactivestreams.Publisher;
2425
import org.reactivestreams.Subscriber;
2526
import reactor.core.Disposable;
@@ -111,17 +112,20 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
111112
Flux.from(publisher)
112113
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
113114
.publishOn(this.scheduler)
114-
.doOnNext((message) -> {
115-
try {
116-
if (!send(message)) {
117-
throw new MessageDeliveryException(message,
118-
"Failed to send message to channel '" + this);
119-
}
120-
}
121-
catch (Exception ex) {
122-
logger.warn(ex, () -> "Error during processing event: " + message);
123-
}
124-
})
115+
.transformDeferredContextual((flux, contextView) ->
116+
flux.doOnNext((message) -> {
117+
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
118+
try (scope) {
119+
if (!send(message)) {
120+
throw new MessageDeliveryException(message,
121+
"Failed to send message to channel '" + this);
122+
}
123+
}
124+
catch (Exception ex) {
125+
logger.warn(ex, () -> "Error during processing event: " + message);
126+
}
127+
}))
128+
.contextCapture()
125129
.subscribe());
126130
}
127131

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Set;
2121
import java.util.concurrent.ConcurrentHashMap;
2222

23+
import io.micrometer.context.ContextSnapshot;
2324
import io.micrometer.observation.ObservationRegistry;
2425
import org.reactivestreams.Publisher;
2526
import org.reactivestreams.Subscriber;
@@ -720,7 +721,7 @@ private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestC
720721
throw new MessageMappingException("Cannot map to message: " + object, e);
721722
}
722723

723-
return Mono.defer(() -> {
724+
return Mono.deferContextual(contextView -> {
724725
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
725726
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
726727

@@ -738,7 +739,10 @@ private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestC
738739
.setErrorChannel(replyChan)
739740
.build();
740741

741-
sendMessageForReactiveFlow(requestChannel, messageToSend);
742+
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
743+
try (scope) {
744+
sendMessageForReactiveFlow(requestChannel, messageToSend);
745+
}
742746

743747
return buildReplyMono(requestMessage, replyChan.replyMono.asMono(), error,
744748
originalReplyChannelHeader, originalErrorChannelHeader);

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/inbound/WebFluxInboundEndpoint.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import java.util.function.Supplier;
2828
import java.util.stream.Collectors;
2929

30+
import io.micrometer.context.ContextSnapshot;
3031
import org.reactivestreams.Publisher;
3132
import reactor.core.publisher.Flux;
3233
import reactor.core.publisher.Mono;
@@ -151,16 +152,20 @@ private Mono<Void> doHandle(ServerWebExchange exchange) {
151152
new RequestEntity<>(body, exchange.getRequest().getHeaders(),
152153
exchange.getRequest().getMethod(), exchange.getRequest().getURI()))
153154
.flatMap(entity -> buildMessage(entity, exchange))
154-
.flatMap(requestTuple -> {
155-
if (isExpectReply()) {
156-
return sendAndReceiveMessageReactive(requestTuple.getT1())
157-
.flatMap(replyMessage -> populateResponse(exchange, replyMessage));
158-
}
159-
else {
160-
send(requestTuple.getT1());
161-
return setStatusCode(exchange, requestTuple.getT2());
162-
}
163-
})
155+
.flatMap(requestTuple ->
156+
Mono.deferContextual(contextView -> {
157+
if (isExpectReply()) {
158+
return sendAndReceiveMessageReactive(requestTuple.getT1())
159+
.flatMap(replyMessage -> populateResponse(exchange, replyMessage));
160+
}
161+
else {
162+
var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView);
163+
try (scope) {
164+
send(requestTuple.getT1());
165+
}
166+
return setStatusCode(exchange, requestTuple.getT2());
167+
}
168+
}))
164169
.doOnTerminate(this.activeCount::decrementAndGet);
165170

166171
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.webflux.observation;
18+
19+
import java.util.stream.Collectors;
20+
21+
import brave.Tracing;
22+
import brave.propagation.ThreadLocalCurrentTraceContext;
23+
import brave.test.TestSpanHandler;
24+
import io.micrometer.observation.ObservationHandler;
25+
import io.micrometer.observation.ObservationRegistry;
26+
import io.micrometer.observation.tck.TestObservationRegistryAssert;
27+
import io.micrometer.tracing.Tracer;
28+
import io.micrometer.tracing.brave.bridge.BraveBaggageManager;
29+
import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext;
30+
import io.micrometer.tracing.brave.bridge.BraveFinishedSpan;
31+
import io.micrometer.tracing.brave.bridge.BravePropagator;
32+
import io.micrometer.tracing.brave.bridge.BraveTracer;
33+
import io.micrometer.tracing.handler.DefaultTracingObservationHandler;
34+
import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler;
35+
import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler;
36+
import io.micrometer.tracing.propagation.Propagator;
37+
import io.micrometer.tracing.test.simple.SpansAssert;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Test;
40+
41+
import org.springframework.beans.factory.annotation.Autowired;
42+
import org.springframework.context.ApplicationContext;
43+
import org.springframework.context.annotation.Bean;
44+
import org.springframework.context.annotation.Configuration;
45+
import org.springframework.http.HttpMethod;
46+
import org.springframework.integration.channel.FluxMessageChannel;
47+
import org.springframework.integration.channel.interceptor.ObservationPropagationChannelInterceptor;
48+
import org.springframework.integration.config.EnableIntegration;
49+
import org.springframework.integration.config.EnableIntegrationManagement;
50+
import org.springframework.integration.config.GlobalChannelInterceptor;
51+
import org.springframework.integration.dsl.IntegrationFlow;
52+
import org.springframework.integration.webflux.dsl.WebFlux;
53+
import org.springframework.messaging.Message;
54+
import org.springframework.messaging.PollableChannel;
55+
import org.springframework.messaging.support.ChannelInterceptor;
56+
import org.springframework.test.annotation.DirtiesContext;
57+
import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig;
58+
import org.springframework.test.web.reactive.server.WebTestClient;
59+
import org.springframework.web.filter.reactive.ServerHttpObservationFilter;
60+
import org.springframework.web.reactive.config.EnableWebFlux;
61+
62+
import static org.assertj.core.api.Assertions.assertThat;
63+
64+
/**
65+
* @author Artem Bilan
66+
*
67+
* @since 6.0.3
68+
*/
69+
@SpringJUnitWebConfig
70+
@DirtiesContext
71+
public class WebFluxObservationPropagationTests {
72+
73+
private static final TestSpanHandler SPANS = new TestSpanHandler();
74+
75+
@Autowired
76+
ObservationRegistry observationRegistry;
77+
78+
@Autowired
79+
private WebTestClient webTestClient;
80+
81+
@Autowired
82+
private PollableChannel testChannel;
83+
84+
@BeforeEach
85+
void setup() {
86+
SPANS.clear();
87+
}
88+
89+
@Test
90+
void observationIsPropagatedFromWebFluxToServiceActivator() {
91+
String testData = "testData";
92+
93+
this.webTestClient.post().uri("/test")
94+
.bodyValue(testData)
95+
.exchange()
96+
.expectStatus().isOk();
97+
98+
Message<?> receive = this.testChannel.receive(10_000);
99+
assertThat(receive).isNotNull()
100+
.extracting(Message::getPayload)
101+
.isEqualTo("Received data: " + testData);
102+
103+
TestObservationRegistryAssert.assertThat(this.observationRegistry)
104+
.hasRemainingCurrentObservation();
105+
106+
this.observationRegistry.getCurrentObservation().stop();
107+
108+
assertThat(SPANS.spans()).hasSize(6);
109+
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
110+
.haveSameTraceId();
111+
}
112+
113+
@Test
114+
void observationIsPropagatedWebFluxRequestReply() {
115+
String testData = "TESTDATA";
116+
117+
this.webTestClient.get().uri("/testRequestReply?name=" + testData)
118+
.headers(headers -> headers.setBasicAuth("guest", "guest"))
119+
.exchange()
120+
.expectBody(String.class)
121+
.isEqualTo(testData.toLowerCase());
122+
123+
assertThat(SPANS.spans()).hasSize(3);
124+
SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList()))
125+
.haveSameTraceId();
126+
}
127+
128+
@Configuration
129+
@EnableWebFlux
130+
@EnableIntegration
131+
@EnableIntegrationManagement(observationPatterns = "*")
132+
public static class ContextConfiguration {
133+
134+
@Bean
135+
public Tracing braveTracing() {
136+
return Tracing.newBuilder().addSpanHandler(SPANS).build();
137+
}
138+
139+
@Bean
140+
Tracer simpleTracer(Tracing tracing) {
141+
return new BraveTracer(tracing.tracer(),
142+
new BraveCurrentTraceContext(ThreadLocalCurrentTraceContext.create()),
143+
new BraveBaggageManager());
144+
}
145+
146+
@Bean
147+
BravePropagator bravePropagator(Tracing tracing) {
148+
return new BravePropagator(tracing);
149+
}
150+
151+
152+
@Bean
153+
ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator) {
154+
ObservationRegistry observationRegistry = ObservationRegistry.create();
155+
observationRegistry.observationConfig()
156+
.observationHandler(
157+
// Composite will pick the first matching handler
158+
new ObservationHandler.FirstMatchingCompositeObservationHandler(
159+
// This is responsible for creating a child span on the sender side
160+
new PropagatingSenderTracingObservationHandler<>(tracer, propagator),
161+
// This is responsible for creating a span on the receiver side
162+
new PropagatingReceiverTracingObservationHandler<>(tracer, propagator),
163+
// This is responsible for creating a default span
164+
new DefaultTracingObservationHandler(tracer)));
165+
return observationRegistry;
166+
}
167+
168+
@Bean
169+
WebTestClient webTestClient(ApplicationContext applicationContext) {
170+
return WebTestClient.bindToApplicationContext(applicationContext).build();
171+
}
172+
173+
@Bean
174+
ServerHttpObservationFilter webfluxObservationFilter(ObservationRegistry registry) {
175+
return new ServerHttpObservationFilter(registry);
176+
}
177+
178+
@Bean
179+
@GlobalChannelInterceptor
180+
public ChannelInterceptor observationPropagationInterceptor(ObservationRegistry observationRegistry) {
181+
return new ObservationPropagationChannelInterceptor(observationRegistry);
182+
}
183+
184+
@Bean
185+
IntegrationFlow webFluxFlow() {
186+
return IntegrationFlow
187+
.from(WebFlux.inboundChannelAdapter("/test")
188+
.requestMapping((mapping) -> mapping.methods(HttpMethod.POST))
189+
.requestPayloadType(String.class)
190+
.id("webFluxInbound"))
191+
.channel(c -> c.flux("requestChannel"))
192+
.<String>handle((p, h) -> "Received data: " + p, e -> e.id("testHandler"))
193+
.channel(c -> c.queue("testChannel"))
194+
.get();
195+
}
196+
197+
@Bean
198+
FluxMessageChannel webFluxRequestChannel() {
199+
return new FluxMessageChannel();
200+
}
201+
202+
@Bean
203+
IntegrationFlow webFluxRequestReplyFlow(FluxMessageChannel webFluxRequestChannel) {
204+
return IntegrationFlow.from(WebFlux.inboundGateway("/testRequestReply")
205+
.requestMapping(r -> r.params("name"))
206+
.payloadExpression("#requestParams.name[0]")
207+
.requestChannel(webFluxRequestChannel)
208+
.id("webFluxGateway"))
209+
.<String, String>transform(String::toLowerCase, e -> e.id("testTransformer"))
210+
.get();
211+
}
212+
213+
}
214+
215+
}

0 commit comments

Comments
 (0)