Skip to content

Commit 0bfd4a6

Browse files
ohksj77acogoluegnes
authored andcommitted
Distinguish nack metrics by requeue flag
(cherry picked from commit 92b944d) Conflicts: src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
1 parent ca9035b commit 0bfd4a6

8 files changed

+41
-17
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ default void basicPublishUnrouted(Channel channel) {
5858

5959
void basicAck(Channel channel, long deliveryTag, boolean multiple);
6060

61-
void basicNack(Channel channel, long deliveryTag);
61+
void basicNack(Channel channel, long deliveryTag, boolean requeue);
6262

63-
void basicReject(Channel channel, long deliveryTag);
63+
void basicReject(Channel channel, long deliveryTag, boolean requeue);
6464

6565
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
6666

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
4646
}
4747

4848
@Override
49-
public void basicNack(Channel channel, long deliveryTag) {
49+
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
5050

5151
}
5252

5353
@Override
54-
public void basicReject(Channel channel, long deliveryTag) {
54+
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
5555

5656
}
5757

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
4242

4343
private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();
4444

45-
private final Runnable markRejectedMessageAction = () -> markRejectedMessage();
45+
private final Function<Boolean, Runnable> markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue);
4646

4747
@Override
4848
public void newConnection(final Connection connection) {
@@ -226,18 +226,18 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
226226
}
227227

228228
@Override
229-
public void basicNack(Channel channel, long deliveryTag) {
229+
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
230230
try {
231-
updateChannelStateAfterAckReject(channel, deliveryTag, true, markRejectedMessageAction);
231+
updateChannelStateAfterAckReject(channel, deliveryTag, true, markRejectedMessageAction.apply(requeue));
232232
} catch(Exception e) {
233233
LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage());
234234
}
235235
}
236236

237237
@Override
238-
public void basicReject(Channel channel, long deliveryTag) {
238+
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
239239
try {
240-
updateChannelStateAfterAckReject(channel, deliveryTag, false, markRejectedMessageAction);
240+
updateChannelStateAfterAckReject(channel, deliveryTag, false, markRejectedMessageAction.apply(requeue));
241241
} catch(Exception e) {
242242
LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage());
243243
}
@@ -395,7 +395,7 @@ private ChannelState(Channel channel) {
395395
/**
396396
* Marks the event of a rejected message.
397397
*/
398-
protected abstract void markRejectedMessage();
398+
protected abstract void markRejectedMessage(boolean requeue);
399399

400400
/**
401401
* Marks the event of a message publishing acknowledgement.

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,7 +1215,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
12151215
throws IOException
12161216
{
12171217
transmit(new Basic.Nack(deliveryTag, multiple, requeue));
1218-
metricsCollector.basicNack(this, deliveryTag);
1218+
metricsCollector.basicNack(this, deliveryTag, requeue);
12191219
}
12201220

12211221
/** Public API - {@inheritDoc} */
@@ -1224,7 +1224,7 @@ public void basicReject(long deliveryTag, boolean requeue)
12241224
throws IOException
12251225
{
12261226
transmit(new Basic.Reject(deliveryTag, requeue));
1227-
metricsCollector.basicReject(this, deliveryTag);
1227+
metricsCollector.basicReject(this, deliveryTag, requeue);
12281228
}
12291229

12301230
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
6363

6464
private final Counter rejectedMessages;
6565

66+
private final Counter requeuedPublishedMessages;
67+
6668
public MicrometerMetricsCollector(MeterRegistry registry) {
6769
this(registry, "rabbitmq");
6870
}
@@ -90,6 +92,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
9092
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
9193
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
9294
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
95+
this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES);
9396
}
9497

9598
@Override
@@ -133,7 +136,10 @@ protected void markAcknowledgedMessage() {
133136
}
134137

135138
@Override
136-
protected void markRejectedMessage() {
139+
protected void markRejectedMessage(boolean requeue) {
140+
if (requeue) {
141+
requeuedPublishedMessages.increment();
142+
}
137143
rejectedMessages.increment();
138144
}
139145

@@ -252,6 +258,12 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
252258
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
253259
return registry.counter(prefix + ".unrouted_published", tags);
254260
}
261+
},
262+
REQUEUED_PUBLISHED_MESSAGES {
263+
@Override
264+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
265+
return registry.counter(prefix + ".requeued_published", tags);
266+
}
255267
};
256268

257269
/**

src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
4646
private final LongCounter ackedPublishedMessagesCounter;
4747
private final LongCounter nackedPublishedMessagesCounter;
4848
private final LongCounter unroutedPublishedMessagesCounter;
49+
private final LongCounter requeuedPublishedMessagesCounter;
4950

5051
public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
5152
this(openTelemetry, "rabbitmq");
@@ -123,6 +124,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
123124
.setUnit("{messages}")
124125
.setDescription("The number of un-routed published messages to the RabbitMQ server")
125126
.build();
127+
128+
// requeuedPublishedMessages
129+
this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published")
130+
.setUnit("{messages}")
131+
.setDescription("The number of re-queued published messages to the RabbitMQ server")
132+
.build();
126133
}
127134

128135
@Override
@@ -166,7 +173,10 @@ protected void markAcknowledgedMessage() {
166173
}
167174

168175
@Override
169-
protected void markRejectedMessage() {
176+
protected void markRejectedMessage(boolean requeue) {
177+
if (requeue) {
178+
requeuedPublishedMessagesCounter.add(1L, attributes);
179+
}
170180
rejectedMessagesCounter.add(1L, attributes);
171181
}
172182

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
4545
private final Meter publishAcknowledgedMessages;
4646
private final Meter publishNacknowledgedMessages;
4747
private final Meter publishUnroutedMessages;
48+
private final Meter requeuedPublishedMessages;
4849

4950

5051
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
@@ -59,6 +60,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
5960
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
6061
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
6162
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
63+
this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published");
6264
}
6365

6466
public StandardMetricsCollector() {
@@ -110,7 +112,7 @@ protected void markAcknowledgedMessage() {
110112
}
111113

112114
@Override
113-
protected void markRejectedMessage() {
115+
protected void markRejectedMessage(boolean requeue) {
114116
rejectedMessages.mark();
115117
}
116118

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,15 +108,15 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throw
108108
return;
109109
}
110110
transmit(new Basic.Nack(realTag, multiple, requeue));
111-
metricsCollector.basicNack(this, deliveryTag);
111+
metricsCollector.basicNack(this, deliveryTag, requeue);
112112
}
113113

114114
@Override
115115
public void basicReject(long deliveryTag, boolean requeue) throws IOException {
116116
long realTag = deliveryTag - activeDeliveryTagOffset;
117117
if (realTag > 0) {
118118
transmit(new Basic.Reject(realTag, requeue));
119-
metricsCollector.basicReject(this, deliveryTag);
119+
metricsCollector.basicReject(this, deliveryTag, requeue);
120120
}
121121
}
122122

0 commit comments

Comments
 (0)