Skip to content

Commit 9ad2b8a

Browse files
authored
Fix JMS Inbound Endpoints for observation
The `JmsMessageDrivenEndpoint` delegates all the hard work to the `ChannelPublishingJmsMessageListener`, but missed to propagate an `ObservationRegistry` and other related options. The `JmsInboundGateway` is worse: it delegated to the `JmsMessageDrivenEndpoint` * Add `IntegrationObservation.HANDLER` observation to the `MessagingGatewaySupport.send()` operation: used by the delegate in the `ChannelPublishingJmsMessageListener` * Expose and propagate observation-related options from `JmsInboundGateway` and `JmsMessageDrivenEndpoint` * Expose `observationConvention()` option on the `MessagingGatewaySpec` and `MessageProducerSpec` * Remove unused imports * Do not start a new `RECEIVER` observation if there is already `SERVER` one * Fix `MessagingGatewaySupport` for `Observation.NOOP` check. The parent process may still use `ObservationRegistry.NOOP` which sets `Observation.NOOP` instance into the current context and thread local. **Cherry-pick to `6.1.x` & `6.0.x`**
1 parent 459de03 commit 9ad2b8a

File tree

8 files changed

+207
-34
lines changed

8 files changed

+207
-34
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,7 @@ project('spring-integration-jms') {
766766
testImplementation "org.apache.activemq:artemis-jakarta-client:$artemisVersion"
767767
testImplementation 'org.springframework:spring-oxm'
768768
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
769+
testImplementation 'io.micrometer:micrometer-observation-test'
769770
}
770771
}
771772

spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageProducerSpec.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.springframework.integration.endpoint.MessageProducerSupport;
2020
import org.springframework.integration.support.ErrorMessageStrategy;
21+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
2122
import org.springframework.lang.Nullable;
2223
import org.springframework.messaging.MessageChannel;
2324

@@ -152,4 +153,16 @@ public S errorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
152153
return _this();
153154
}
154155

156+
/**
157+
* Provide a custom {@link MessageReceiverObservationConvention}.
158+
* @param observationConvention the observation convention to use.
159+
* @return the spec.
160+
* @since 6.0.8
161+
* @see MessageProducerSupport#setObservationConvention(MessageReceiverObservationConvention)
162+
*/
163+
public S observationConvention(MessageReceiverObservationConvention observationConvention) {
164+
this.target.setObservationConvention(observationConvention);
165+
return _this();
166+
}
167+
155168
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/MessagingGatewaySpec.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.integration.gateway.MessagingGatewaySupport;
2020
import org.springframework.integration.mapping.InboundMessageMapper;
2121
import org.springframework.integration.mapping.OutboundMessageMapper;
22+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
2223
import org.springframework.lang.Nullable;
2324
import org.springframework.messaging.MessageChannel;
2425

@@ -205,4 +206,16 @@ public S shouldTrack(boolean shouldTrack) {
205206
return _this();
206207
}
207208

209+
/**
210+
* Provide a custom {@link MessageRequestReplyReceiverObservationConvention}.
211+
* @param observationConvention the observation convention to use.
212+
* @return the spec.
213+
* @since 6.0.8
214+
* @see MessagingGatewaySupport#setObservationConvention(MessageRequestReplyReceiverObservationConvention)
215+
*/
216+
public S observationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
217+
this.target.setObservationConvention(observationConvention);
218+
return _this();
219+
}
220+
208221
}

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 68 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.locks.Lock;
2323
import java.util.concurrent.locks.ReentrantLock;
2424

25+
import io.micrometer.observation.Observation;
2526
import io.micrometer.observation.ObservationRegistry;
2627
import org.reactivestreams.Publisher;
2728
import org.reactivestreams.Subscriber;
@@ -56,12 +57,16 @@
5657
import org.springframework.integration.support.converter.SimpleMessageConverter;
5758
import org.springframework.integration.support.management.IntegrationInboundManagement;
5859
import org.springframework.integration.support.management.IntegrationManagedResource;
60+
import org.springframework.integration.support.management.TrackableComponent;
5961
import org.springframework.integration.support.management.metrics.MeterFacade;
6062
import org.springframework.integration.support.management.metrics.MetricsCaptor;
6163
import org.springframework.integration.support.management.metrics.SampleFacade;
6264
import org.springframework.integration.support.management.metrics.TimerFacade;
65+
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
6366
import org.springframework.integration.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention;
6467
import org.springframework.integration.support.management.observation.IntegrationObservation;
68+
import org.springframework.integration.support.management.observation.MessageReceiverContext;
69+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
6570
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverContext;
6671
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
6772
import org.springframework.lang.Nullable;
@@ -91,7 +96,7 @@
9196
*/
9297
@IntegrationManagedResource
9398
public abstract class MessagingGatewaySupport extends AbstractEndpoint
94-
implements org.springframework.integration.support.management.TrackableComponent,
99+
implements TrackableComponent,
95100
IntegrationInboundManagement, IntegrationPattern {
96101

97102
protected final ConvertingMessagingTemplate messagingTemplate; // NOSONAR
@@ -144,6 +149,8 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint
144149
@Nullable
145150
private MessageRequestReplyReceiverObservationConvention observationConvention;
146151

152+
private MessageReceiverObservationConvention receiverObservationConvention;
153+
147154
private volatile AbstractEndpoint replyMessageCorrelator;
148155

149156
private volatile boolean initialized;
@@ -384,6 +391,10 @@ public void setObservationConvention(
384391
this.observationConvention = observationConvention;
385392
}
386393

394+
public void setReceiverObservationConvention(MessageReceiverObservationConvention receiverObservationConvention) {
395+
this.receiverObservationConvention = receiverObservationConvention;
396+
}
397+
387398
@Override
388399
protected void onInit() {
389400
Assert.state(!(this.requestChannelName != null && this.requestChannel != null),
@@ -468,27 +479,65 @@ protected void send(Object object) {
468479
MessageChannel channel = getRequestChannel();
469480
Assert.state(channel != null,
470481
"send is not supported, because no request channel has been configured");
471-
SampleFacade sample = null;
472-
if (this.metricsCaptor != null) {
473-
sample = this.metricsCaptor.start();
482+
483+
Message<?> requestMessage = this.messagingTemplate.doConvert(object, null, this.historyWritingPostProcessor);
484+
485+
if (!ObservationRegistry.NOOP.equals(this.observationRegistry)
486+
&& (this.observationRegistry.getCurrentObservation() == null
487+
|| Observation.NOOP.equals(this.observationRegistry.getCurrentObservation()))) {
488+
489+
sendWithObservation(channel, requestMessage);
490+
}
491+
else if (this.metricsCaptor != null) {
492+
sendWithMetrics(channel, requestMessage);
474493
}
494+
else {
495+
doSend(channel, requestMessage);
496+
}
497+
}
498+
499+
private void sendWithObservation(MessageChannel channel, Message<?> message) {
475500
try {
476-
this.messagingTemplate.convertAndSend(channel, object, this.historyWritingPostProcessor);
477-
if (sample != null) {
478-
sample.stop(sendTimer());
479-
}
501+
IntegrationObservation.HANDLER.observation(
502+
this.receiverObservationConvention,
503+
DefaultMessageReceiverObservationConvention.INSTANCE,
504+
() -> new MessageReceiverContext(message, getComponentName()),
505+
this.observationRegistry)
506+
.observe(() -> this.messagingTemplate.send(channel, message));
480507
}
481-
catch (Exception e) {
482-
if (sample != null) {
483-
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
484-
}
485-
MessageChannel errorChan = getErrorChannel();
486-
if (errorChan != null) {
487-
this.messagingTemplate.send(errorChan, new ErrorMessage(e));
488-
}
489-
else {
490-
rethrow(e, "failed to send message");
491-
}
508+
catch (Exception ex) {
509+
sendErrorMessage(ex, message);
510+
}
511+
}
512+
513+
private void sendWithMetrics(MessageChannel channel, Message<?> message) {
514+
SampleFacade sample = this.metricsCaptor.start();
515+
try {
516+
this.messagingTemplate.send(channel, message);
517+
sample.stop(sendTimer());
518+
}
519+
catch (Exception ex) {
520+
sample.stop(buildSendTimer(false, ex.getClass().getSimpleName()));
521+
sendErrorMessage(ex, message);
522+
}
523+
}
524+
525+
private void doSend(MessageChannel channel, Message<?> message) {
526+
try {
527+
this.messagingTemplate.send(channel, message);
528+
}
529+
catch (Exception ex) {
530+
sendErrorMessage(ex, message);
531+
}
532+
}
533+
534+
private void sendErrorMessage(Exception exception, Message<?> failedMessage) {
535+
MessageChannel errorChan = getErrorChannel();
536+
if (errorChan != null) {
537+
this.messagingTemplate.send(errorChan, buildErrorMessage(failedMessage, exception));
538+
}
539+
else {
540+
rethrow(exception, "failed to send message");
492541
}
493542
}
494543

spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Map;
2020

21+
import io.micrometer.observation.ObservationRegistry;
2122
import jakarta.jms.DeliveryMode;
2223
import jakarta.jms.Destination;
2324
import jakarta.jms.InvalidDestinationException;
@@ -38,6 +39,9 @@
3839
import org.springframework.integration.support.DefaultMessageBuilderFactory;
3940
import org.springframework.integration.support.MessageBuilderFactory;
4041
import org.springframework.integration.support.management.TrackableComponent;
42+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
43+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
44+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
4145
import org.springframework.integration.support.utils.IntegrationUtils;
4246
import org.springframework.jms.listener.SessionAwareMessageListener;
4347
import org.springframework.jms.support.JmsUtils;
@@ -323,6 +327,26 @@ public void setExtractReplyPayload(boolean extractReplyPayload) {
323327
this.extractReplyPayload = extractReplyPayload;
324328
}
325329

330+
public void setMetricsCaptor(MetricsCaptor captor) {
331+
this.gatewayDelegate.registerMetricsCaptor(captor);
332+
}
333+
334+
public void setObservationRegistry(ObservationRegistry observationRegistry) {
335+
this.gatewayDelegate.registerObservationRegistry(observationRegistry);
336+
}
337+
338+
public void setRequestReplyObservationConvention(
339+
@Nullable MessageRequestReplyReceiverObservationConvention observationConvention) {
340+
341+
this.gatewayDelegate.setObservationConvention(observationConvention);
342+
}
343+
344+
public void setReceiverObservationConvention(
345+
@Nullable MessageReceiverObservationConvention observationConvention) {
346+
347+
this.gatewayDelegate.setReceiverObservationConvention(observationConvention);
348+
}
349+
326350
@Override
327351
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
328352
this.beanFactory = beanFactory;

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,10 +16,14 @@
1616

1717
package org.springframework.integration.jms;
1818

19+
import io.micrometer.observation.ObservationRegistry;
20+
1921
import org.springframework.beans.BeansException;
2022
import org.springframework.context.ApplicationContext;
2123
import org.springframework.integration.context.OrderlyShutdownCapable;
2224
import org.springframework.integration.gateway.MessagingGatewaySupport;
25+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
26+
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
2327
import org.springframework.jms.listener.AbstractMessageListenerContainer;
2428
import org.springframework.messaging.MessageChannel;
2529

@@ -114,28 +118,38 @@ public void setShutdownContainerOnStop(boolean shutdownContainerOnStop) {
114118
this.endpoint.setShutdownContainerOnStop(shutdownContainerOnStop);
115119
}
116120

121+
@Override
122+
public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) {
123+
super.registerMetricsCaptor(metricsCaptorToRegister);
124+
this.endpoint.registerMetricsCaptor(metricsCaptorToRegister);
125+
}
117126

118127
@Override
119-
public String getComponentType() {
120-
return this.endpoint.getComponentType();
128+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
129+
super.registerObservationRegistry(observationRegistry);
130+
this.endpoint.registerObservationRegistry(observationRegistry);
121131
}
122132

123133
@Override
124-
public void setComponentName(String componentName) {
125-
super.setComponentName(componentName);
126-
this.endpoint.setComponentName(getComponentName());
134+
public void setObservationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
135+
super.setObservationConvention(observationConvention);
136+
this.endpoint.getListener().setRequestReplyObservationConvention(observationConvention);
137+
}
138+
139+
@Override
140+
public String getComponentType() {
141+
return this.endpoint.getComponentType();
127142
}
128143

129144
@Override
130145
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
131146
super.setApplicationContext(applicationContext);
132147
this.endpoint.setApplicationContext(applicationContext);
133-
this.endpoint.setBeanFactory(applicationContext);
134-
this.endpoint.getListener().setBeanFactory(applicationContext);
135148
}
136149

137150
@Override
138151
protected void onInit() {
152+
this.endpoint.setComponentName(getComponentName());
139153
this.endpoint.afterPropertiesSet();
140154
}
141155

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,15 @@
1616

1717
package org.springframework.integration.jms;
1818

19+
import io.micrometer.observation.ObservationRegistry;
20+
1921
import org.springframework.beans.BeansException;
2022
import org.springframework.context.ApplicationContext;
2123
import org.springframework.integration.context.OrderlyShutdownCapable;
2224
import org.springframework.integration.endpoint.MessageProducerSupport;
2325
import org.springframework.integration.jms.util.JmsAdapterUtils;
26+
import org.springframework.integration.support.management.metrics.MetricsCaptor;
27+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
2428
import org.springframework.jms.listener.AbstractMessageListenerContainer;
2529
import org.springframework.jms.listener.DefaultMessageListenerContainer;
2630
import org.springframework.messaging.MessageChannel;
@@ -91,7 +95,7 @@ private JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContai
9195
* container setting even if an external container is provided. Defaults to null
9296
* (won't change container) if an external container is provided or `transacted` when
9397
* the framework creates an implicit {@link DefaultMessageListenerContainer}.
94-
* @param sessionAcknowledgeMode the acknowledge mode.
98+
* @param sessionAcknowledgeMode the acknowledgement mode.
9599
*/
96100
public void setSessionAcknowledgeMode(String sessionAcknowledgeMode) {
97101
this.sessionAcknowledgeMode = sessionAcknowledgeMode;
@@ -134,9 +138,9 @@ public void setShouldTrack(boolean shouldTrack) {
134138
}
135139

136140
/**
137-
* Set to false to prevent listener container shutdown when the endpoint is stopped.
141+
* Set to {@code false} to prevent listener container shutdown when the endpoint is stopped.
138142
* Then, if so configured, any cached consumer(s) in the container will remain.
139-
* Otherwise the shared connection and will be closed and the listener invokers shut
143+
* Otherwise, the shared connection and will be closed and the listener invokers shut
140144
* down; this behavior is new starting with version 5.1. Default: true.
141145
* @param shutdownContainerOnStop false to not shutdown.
142146
* @since 5.1
@@ -149,6 +153,24 @@ public ChannelPublishingJmsMessageListener getListener() {
149153
return this.listener;
150154
}
151155

156+
@Override
157+
public void registerMetricsCaptor(MetricsCaptor captor) {
158+
super.registerMetricsCaptor(captor);
159+
this.listener.setMetricsCaptor(captor);
160+
}
161+
162+
@Override
163+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
164+
super.registerObservationRegistry(observationRegistry);
165+
this.listener.setObservationRegistry(observationRegistry);
166+
}
167+
168+
@Override
169+
public void setObservationConvention(MessageReceiverObservationConvention observationConvention) {
170+
super.setObservationConvention(observationConvention);
171+
this.listener.setReceiverObservationConvention(observationConvention);
172+
}
173+
152174
@Override
153175
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
154176
super.setApplicationContext(applicationContext);

0 commit comments

Comments
 (0)