Skip to content

Commit 9a1d6aa

Browse files
committed
Polish support for new "requeued" count metric
Ensure backward compatibility, use "requeued" name, update copyright year, add test. (cherry picked from commit 28c7d51) Conflicts: src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
1 parent 0bfd4a6 commit 9a1d6aa

File tree

7 files changed

+176
-31
lines changed

7 files changed

+176
-31
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -58,9 +58,17 @@ default void basicPublishUnrouted(Channel channel) {
5858

5959
void basicAck(Channel channel, long deliveryTag, boolean multiple);
6060

61-
void basicNack(Channel channel, long deliveryTag, boolean requeue);
61+
void basicNack(Channel channel, long deliveryTag);
6262

63-
void basicReject(Channel channel, long deliveryTag, boolean requeue);
63+
default void basicNack(Channel channel, long deliveryTag, boolean requeue) {
64+
this.basicNack(channel, deliveryTag);
65+
}
66+
67+
void basicReject(Channel channel, long deliveryTag);
68+
69+
default void basicReject(Channel channel, long deliveryTag, boolean requeue) {
70+
this.basicReject(channel, deliveryTag);
71+
}
6472

6573
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
6674

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -45,11 +45,21 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
4545

4646
}
4747

48+
@Override
49+
public void basicNack(Channel channel, long deliveryTag) {
50+
51+
}
52+
4853
@Override
4954
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
5055

5156
}
5257

58+
@Override
59+
public void basicReject(Channel channel, long deliveryTag) {
60+
61+
}
62+
5363
@Override
5464
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
5565

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ConcurrentMap;
2525
import java.util.concurrent.locks.Lock;
2626
import java.util.concurrent.locks.ReentrantLock;
27+
import java.util.function.Function;
2728

2829
/**
2930
* Base class for {@link MetricsCollector}.
@@ -42,7 +43,13 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
4243

4344
private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();
4445

45-
private final Function<Boolean, Runnable> markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue);
46+
private final Function<Boolean, Runnable> markRejectedMessageAction;
47+
48+
public AbstractMetricsCollector() {
49+
Runnable rejectRequeue = () -> markRejectedMessage(true);
50+
Runnable rejectNoRequeue = () -> markRejectedMessage(false);
51+
this.markRejectedMessageAction = requeue -> requeue ? rejectRequeue : rejectNoRequeue;
52+
}
4653

4754
@Override
4855
public void newConnection(final Connection connection) {
@@ -225,6 +232,11 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
225232
}
226233
}
227234

235+
@Override
236+
public void basicNack(Channel channel, long deliveryTag) {
237+
// replaced by #basicNack(Channel, long, boolean)
238+
}
239+
228240
@Override
229241
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
230242
try {
@@ -234,6 +246,11 @@ public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
234246
}
235247
}
236248

249+
@Override
250+
public void basicReject(Channel channel, long deliveryTag) {
251+
// replaced by #basicReject(Channel, long, boolean)
252+
}
253+
237254
@Override
238255
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
239256
try {
@@ -392,10 +409,19 @@ private ChannelState(Channel channel) {
392409
*/
393410
protected abstract void markAcknowledgedMessage();
394411

412+
/**
413+
* Marks the event of a rejected message.
414+
*
415+
* @deprecated Use {@link #markRejectedMessage(boolean)} instead
416+
*/
417+
protected abstract void markRejectedMessage();
418+
395419
/**
396420
* Marks the event of a rejected message.
397421
*/
398-
protected abstract void markRejectedMessage(boolean requeue);
422+
protected void markRejectedMessage(boolean requeue) {
423+
this.markRejectedMessage();
424+
}
399425

400426
/**
401427
* Marks the event of a message publishing acknowledgement.

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
6363

6464
private final Counter rejectedMessages;
6565

66-
private final Counter requeuedPublishedMessages;
66+
private final Counter requeuedMessages;
6767

6868
public MicrometerMetricsCollector(MeterRegistry registry) {
6969
this(registry, "rabbitmq");
@@ -92,7 +92,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
9292
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
9393
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
9494
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
95-
this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES);
95+
this.requeuedMessages = (Counter) metricsCreator.apply(REQUEUED_MESSAGES);
9696
}
9797

9898
@Override
@@ -135,10 +135,16 @@ protected void markAcknowledgedMessage() {
135135
acknowledgedMessages.increment();
136136
}
137137

138+
@Override
139+
@SuppressWarnings("deprecation")
140+
protected void markRejectedMessage() {
141+
142+
}
143+
138144
@Override
139145
protected void markRejectedMessage(boolean requeue) {
140146
if (requeue) {
141-
requeuedPublishedMessages.increment();
147+
requeuedMessages.increment();
142148
}
143149
rejectedMessages.increment();
144150
}
@@ -198,6 +204,10 @@ public Counter getRejectedMessages() {
198204
return rejectedMessages;
199205
}
200206

207+
public Counter getRequeuedMessages() {
208+
return requeuedMessages;
209+
}
210+
201211
public enum Metrics {
202212
CONNECTIONS {
203213
@Override
@@ -235,6 +245,12 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
235245
return registry.counter(prefix + ".rejected", tags);
236246
}
237247
},
248+
REQUEUED_MESSAGES {
249+
@Override
250+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
251+
return registry.counter(prefix + ".requeued", tags);
252+
}
253+
},
238254
FAILED_TO_PUBLISH_MESSAGES {
239255
@Override
240256
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
@@ -258,12 +274,6 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
258274
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
259275
return registry.counter(prefix + ".unrouted_published", tags);
260276
}
261-
},
262-
REQUEUED_PUBLISHED_MESSAGES {
263-
@Override
264-
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
265-
return registry.counter(prefix + ".requeued_published", tags);
266-
}
267277
};
268278

269279
/**

src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -46,7 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
4646
private final LongCounter ackedPublishedMessagesCounter;
4747
private final LongCounter nackedPublishedMessagesCounter;
4848
private final LongCounter unroutedPublishedMessagesCounter;
49-
private final LongCounter requeuedPublishedMessagesCounter;
49+
private final LongCounter requeuedMessagesCounter;
5050

5151
public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
5252
this(openTelemetry, "rabbitmq");
@@ -101,6 +101,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
101101
.setDescription("The number of messages rejected from the RabbitMQ server")
102102
.build();
103103

104+
// requeuedPublishedMessages
105+
this.requeuedMessagesCounter = meter.counterBuilder(prefix + ".requeued")
106+
.setUnit("{messages}")
107+
.setDescription("The number of re-queued messages to the RabbitMQ server")
108+
.build();
109+
104110
// failedToPublishMessages
105111
this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish")
106112
.setUnit("{messages}")
@@ -124,12 +130,6 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
124130
.setUnit("{messages}")
125131
.setDescription("The number of un-routed published messages to the RabbitMQ server")
126132
.build();
127-
128-
// requeuedPublishedMessages
129-
this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published")
130-
.setUnit("{messages}")
131-
.setDescription("The number of re-queued published messages to the RabbitMQ server")
132-
.build();
133133
}
134134

135135
@Override
@@ -172,10 +172,16 @@ protected void markAcknowledgedMessage() {
172172
acknowledgedMessagesCounter.add(1L, attributes);
173173
}
174174

175+
@Override
176+
@SuppressWarnings("deprecation")
177+
protected void markRejectedMessage() {
178+
179+
}
180+
175181
@Override
176182
protected void markRejectedMessage(boolean requeue) {
177183
if (requeue) {
178-
requeuedPublishedMessagesCounter.add(1L, attributes);
184+
requeuedMessagesCounter.add(1L, attributes);
179185
}
180186
rejectedMessagesCounter.add(1L, attributes);
181187
}

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -41,12 +41,11 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
4141
private final Meter consumedMessages;
4242
private final Meter acknowledgedMessages;
4343
private final Meter rejectedMessages;
44+
private final Meter requeuedMessages;
4445
private final Meter failedToPublishMessages;
4546
private final Meter publishAcknowledgedMessages;
4647
private final Meter publishNacknowledgedMessages;
4748
private final Meter publishUnroutedMessages;
48-
private final Meter requeuedPublishedMessages;
49-
5049

5150
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
5251
this.registry = registry;
@@ -60,7 +59,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
6059
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
6160
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
6261
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
63-
this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published");
62+
this.requeuedMessages = registry.meter(metricsPrefix+".requeued");
6463
}
6564

6665
public StandardMetricsCollector() {
@@ -111,8 +110,17 @@ protected void markAcknowledgedMessage() {
111110
acknowledgedMessages.mark();
112111
}
113112

113+
@Override
114+
@SuppressWarnings("deprecation")
115+
protected void markRejectedMessage() {
116+
117+
}
118+
114119
@Override
115120
protected void markRejectedMessage(boolean requeue) {
121+
if (requeue) {
122+
requeuedMessages.mark();
123+
}
116124
rejectedMessages.mark();
117125
}
118126

@@ -159,6 +167,10 @@ public Meter getRejectedMessages() {
159167
return rejectedMessages;
160168
}
161169

170+
public Meter getRequeuedMessages() {
171+
return this.requeuedMessages;
172+
}
173+
162174
public Meter getFailedToPublishMessages() {
163175
return failedToPublishMessages;
164176
}

0 commit comments

Comments
 (0)