diff --git a/build.gradle b/build.gradle index 5aed71873c6..e37a7dc9fc3 100644 --- a/build.gradle +++ b/build.gradle @@ -87,8 +87,9 @@ ext { lettuceVersion = '6.2.2.RELEASE' log4jVersion = '2.19.0' mailVersion = '1.0.0' - micrometerVersion = '1.10.3' - micrometerTracingVersion = '1.0.1' + micrometerPropagationVersion = '1.0.1-SNAPSHOT' + micrometerTracingVersion = '1.0.2-SNAPSHOT' + micrometerVersion = '1.10.4-SNAPSHOT' mockitoVersion = '4.10.0' mongoDriverVersion = '4.8.2' mysqlVersion = '8.0.31' @@ -532,6 +533,7 @@ project('spring-integration-core') { } api 'io.projectreactor:reactor-core' api 'io.micrometer:micrometer-observation' + api "io.micrometer:context-propagation:$micrometerPropagationVersion" optionalApi 'com.fasterxml.jackson.core:jackson-databind' optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8' @@ -992,6 +994,12 @@ project('spring-integration-webflux') { exclude group: 'org.springframework' } testImplementation 'com.fasterxml.jackson.core:jackson-databind' + testImplementation 'io.micrometer:micrometer-observation-test' + testImplementation ('io.micrometer:micrometer-tracing-integration-test') { + exclude group: 'io.opentelemetry' + exclude group: 'com.wavefront' + exclude group: 'io.micrometer', module: 'micrometer-tracing-bridge-otel' + } testRuntimeOnly "com.jayway.jsonpath:json-path:$jsonpathVersion" } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java index 0d8a6ab5ef0..9b797a37583 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -335,6 +335,7 @@ private boolean sendWithObservation(Message message, long timeout) { DefaultMessageSenderObservationConvention.INSTANCE, () -> new MessageSenderContext(messageToSend, getComponentName()), this.observationRegistry) + .parentObservation(this.observationRegistry.getCurrentObservation()) // TODO until the fix in micrometer-observation .observe(() -> sendInternal(messageToSend, timeout)); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java index 4bb527d47d3..4f1fa5e146d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import io.micrometer.context.ContextSnapshot; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import reactor.core.Disposable; @@ -111,17 +112,20 @@ public void subscribeTo(Publisher> publisher) { Flux.from(publisher) .delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next()) .publishOn(this.scheduler) - .doOnNext((message) -> { - try { - if (!send(message)) { - throw new MessageDeliveryException(message, - "Failed to send message to channel '" + this); - } - } - catch (Exception ex) { - logger.warn(ex, () -> "Error during processing event: " + message); - } - }) + .transformDeferredContextual((flux, contextView) -> + flux.doOnNext((message) -> { + var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView); + try (scope) { + if (!send(message)) { + throw new MessageDeliveryException(message, + "Failed to send message to channel '" + this); + } + } + catch (Exception ex) { + logger.warn(ex, () -> "Error during processing event: " + message); + } + })) + .contextCapture() .subscribe()); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java index 9bd68a71e6e..419334f5c9d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java @@ -20,6 +20,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import io.micrometer.context.ContextSnapshot; import io.micrometer.observation.ObservationRegistry; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -720,7 +721,7 @@ private Mono> doSendAndReceiveMessageReactive(MessageChannel requestC throw new MessageMappingException("Cannot map to message: " + object, e); } - return Mono.defer(() -> { + return Mono.deferContextual(contextView -> { Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel(); Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel(); @@ -738,7 +739,10 @@ private Mono> doSendAndReceiveMessageReactive(MessageChannel requestC .setErrorChannel(replyChan) .build(); - sendMessageForReactiveFlow(requestChannel, messageToSend); + var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView); + try (scope) { + sendMessageForReactiveFlow(requestChannel, messageToSend); + } return buildReplyMono(requestMessage, replyChan.replyMono.asMono(), error, originalReplyChannelHeader, originalErrorChannelHeader); diff --git a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/inbound/WebFluxInboundEndpoint.java b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/inbound/WebFluxInboundEndpoint.java index c9d738729a1..f11368ca22c 100644 --- a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/inbound/WebFluxInboundEndpoint.java +++ b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/inbound/WebFluxInboundEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import io.micrometer.context.ContextSnapshot; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -151,16 +152,20 @@ private Mono doHandle(ServerWebExchange exchange) { new RequestEntity<>(body, exchange.getRequest().getHeaders(), exchange.getRequest().getMethod(), exchange.getRequest().getURI())) .flatMap(entity -> buildMessage(entity, exchange)) - .flatMap(requestTuple -> { - if (isExpectReply()) { - return sendAndReceiveMessageReactive(requestTuple.getT1()) - .flatMap(replyMessage -> populateResponse(exchange, replyMessage)); - } - else { - send(requestTuple.getT1()); - return setStatusCode(exchange, requestTuple.getT2()); - } - }) + .flatMap(requestTuple -> + Mono.deferContextual(contextView -> { + if (isExpectReply()) { + return sendAndReceiveMessageReactive(requestTuple.getT1()) + .flatMap(replyMessage -> populateResponse(exchange, replyMessage)); + } + else { + var scope = ContextSnapshot.setAllThreadLocalsFrom(contextView); + try (scope) { + send(requestTuple.getT1()); + } + return setStatusCode(exchange, requestTuple.getT2()); + } + })) .doOnTerminate(this.activeCount::decrementAndGet); } diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java new file mode 100644 index 00000000000..b757bb600e6 --- /dev/null +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java @@ -0,0 +1,215 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.webflux.observation; + +import java.util.stream.Collectors; + +import brave.Tracing; +import brave.propagation.ThreadLocalCurrentTraceContext; +import brave.test.TestSpanHandler; +import io.micrometer.observation.ObservationHandler; +import io.micrometer.observation.ObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistryAssert; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.brave.bridge.BraveBaggageManager; +import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext; +import io.micrometer.tracing.brave.bridge.BraveFinishedSpan; +import io.micrometer.tracing.brave.bridge.BravePropagator; +import io.micrometer.tracing.brave.bridge.BraveTracer; +import io.micrometer.tracing.handler.DefaultTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingReceiverTracingObservationHandler; +import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; +import io.micrometer.tracing.test.simple.SpansAssert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpMethod; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.channel.interceptor.ObservationPropagationChannelInterceptor; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.config.EnableIntegrationManagement; +import org.springframework.integration.config.GlobalChannelInterceptor; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.webflux.dsl.WebFlux; +import org.springframework.messaging.Message; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.ChannelInterceptor; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.web.filter.reactive.ServerHttpObservationFilter; +import org.springframework.web.reactive.config.EnableWebFlux; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 6.0.3 + */ +@SpringJUnitWebConfig +@DirtiesContext +public class WebFluxObservationPropagationTests { + + private static final TestSpanHandler SPANS = new TestSpanHandler(); + + @Autowired + ObservationRegistry observationRegistry; + + @Autowired + private WebTestClient webTestClient; + + @Autowired + private PollableChannel testChannel; + + @BeforeEach + void setup() { + SPANS.clear(); + } + + @Test + void observationIsPropagatedFromWebFluxToServiceActivator() { + String testData = "testData"; + + this.webTestClient.post().uri("/test") + .bodyValue(testData) + .exchange() + .expectStatus().isOk(); + + Message receive = this.testChannel.receive(10_000); + assertThat(receive).isNotNull() + .extracting(Message::getPayload) + .isEqualTo("Received data: " + testData); + + TestObservationRegistryAssert.assertThat(this.observationRegistry) + .hasRemainingCurrentObservation(); + + this.observationRegistry.getCurrentObservation().stop(); + + assertThat(SPANS.spans()).hasSize(6); + SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList())) + .haveSameTraceId(); + } + + @Test + void observationIsPropagatedWebFluxRequestReply() { + String testData = "TESTDATA"; + + this.webTestClient.get().uri("/testRequestReply?name=" + testData) + .headers(headers -> headers.setBasicAuth("guest", "guest")) + .exchange() + .expectBody(String.class) + .isEqualTo(testData.toLowerCase()); + + assertThat(SPANS.spans()).hasSize(3); + SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList())) + .haveSameTraceId(); + } + + @Configuration + @EnableWebFlux + @EnableIntegration + @EnableIntegrationManagement(observationPatterns = "*") + public static class ContextConfiguration { + + @Bean + public Tracing braveTracing() { + return Tracing.newBuilder().addSpanHandler(SPANS).build(); + } + + @Bean + Tracer simpleTracer(Tracing tracing) { + return new BraveTracer(tracing.tracer(), + new BraveCurrentTraceContext(ThreadLocalCurrentTraceContext.create()), + new BraveBaggageManager()); + } + + @Bean + BravePropagator bravePropagator(Tracing tracing) { + return new BravePropagator(tracing); + } + + + @Bean + ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator) { + ObservationRegistry observationRegistry = ObservationRegistry.create(); + observationRegistry.observationConfig() + .observationHandler( + // Composite will pick the first matching handler + new ObservationHandler.FirstMatchingCompositeObservationHandler( + // This is responsible for creating a child span on the sender side + new PropagatingSenderTracingObservationHandler<>(tracer, propagator), + // This is responsible for creating a span on the receiver side + new PropagatingReceiverTracingObservationHandler<>(tracer, propagator), + // This is responsible for creating a default span + new DefaultTracingObservationHandler(tracer))); + return observationRegistry; + } + + @Bean + WebTestClient webTestClient(ApplicationContext applicationContext) { + return WebTestClient.bindToApplicationContext(applicationContext).build(); + } + + @Bean + ServerHttpObservationFilter webfluxObservationFilter(ObservationRegistry registry) { + return new ServerHttpObservationFilter(registry); + } + + @Bean + @GlobalChannelInterceptor + public ChannelInterceptor observationPropagationInterceptor(ObservationRegistry observationRegistry) { + return new ObservationPropagationChannelInterceptor(observationRegistry); + } + + @Bean + IntegrationFlow webFluxFlow() { + return IntegrationFlow + .from(WebFlux.inboundChannelAdapter("/test") + .requestMapping((mapping) -> mapping.methods(HttpMethod.POST)) + .requestPayloadType(String.class) + .id("webFluxInbound")) + .channel(c -> c.flux("requestChannel")) + .handle((p, h) -> "Received data: " + p, e -> e.id("testHandler")) + .channel(c -> c.queue("testChannel")) + .get(); + } + + @Bean + FluxMessageChannel webFluxRequestChannel() { + return new FluxMessageChannel(); + } + + @Bean + IntegrationFlow webFluxRequestReplyFlow(FluxMessageChannel webFluxRequestChannel) { + return IntegrationFlow.from(WebFlux.inboundGateway("/testRequestReply") + .requestMapping(r -> r.params("name")) + .payloadExpression("#requestParams.name[0]") + .requestChannel(webFluxRequestChannel) + .id("webFluxGateway")) + .transform(String::toLowerCase, e -> e.id("testTransformer")) + .get(); + } + + } + +}