Skip to content

Commit 2b8d257

Browse files
committed
Polish recovery retry
Add log in default retry handler, add operation to recover all the bindings of a queue (useful when the recovery of a consumer fails because isn't found), make AutorecoveringConnection#recoverConsumer and AutorecoveringConnection#recoverQueue public as they contain useful logic that some client code should be able to use, and declared a pre-configured retry handler for the deleted queue case. References #387
1 parent 9711406 commit 2b8d257

File tree

4 files changed

+68
-16
lines changed

4 files changed

+68
-16
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ private void recoverExchange(RecordedExchange x, boolean retry) {
709709
}
710710

711711

712-
void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
712+
public void recoverQueue(final String oldName, RecordedQueue q, boolean retry) {
713713
try {
714714
if (topologyRecoveryFilter.filterQueue(q)) {
715715
LOGGER.debug("Recovering {}", q);
@@ -774,7 +774,7 @@ private void recoverBinding(RecordedBinding b, boolean retry) {
774774
}
775775
}
776776

777-
private void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
777+
public void recoverConsumer(final String tag, RecordedConsumer consumer, boolean retry) {
778778
try {
779779
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
780780
LOGGER.debug("Recovering {}", consumer);
@@ -1087,6 +1087,10 @@ public Map<String, RecordedExchange> getRecordedExchanges() {
10871087
return recordedExchanges;
10881088
}
10891089

1090+
public List<RecordedBinding> getRecordedBindings() {
1091+
return recordedBindings;
1092+
}
1093+
10901094
@Override
10911095
public String toString() {
10921096
return this.delegate.toString();

src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package com.rabbitmq.client.impl.recovery;
1717

18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
1821
import java.util.Objects;
1922
import java.util.function.BiPredicate;
2023

@@ -35,6 +38,8 @@
3538
*/
3639
public class DefaultRetryHandler implements RetryHandler {
3740

41+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultRetryHandler.class);
42+
3843
private final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
3944
private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
4045
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
@@ -98,6 +103,7 @@ public RetryResult retryConsumerRecovery(RetryContext context) throws Exception
98103

99104
protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition, RetryOperation<?> operation, RecordedEntity entity, RetryContext context)
100105
throws Exception {
106+
log(entity, context.exception());
101107
int attempts = 0;
102108
Exception exception = context.exception();
103109
while (attempts < retryAttempts) {
@@ -119,6 +125,10 @@ protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition,
119125
throw context.exception();
120126
}
121127

128+
protected void log(RecordedEntity entity, Exception exception) {
129+
LOGGER.info("Error while recovering {}, retrying with {} attempt(s).", entity, retryAttempts, exception);
130+
}
131+
122132
public interface RetryOperation<T> {
123133

124134
T call(RetryContext context) throws Exception;

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.ShutdownSignalException;
2020

21+
import java.util.List;
2122
import java.util.function.BiPredicate;
2223
import java.util.function.Predicate;
2324

25+
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
26+
2427
/**
2528
* Useful ready-to-use conditions and operations for {@link DefaultRetryHandler}.
2629
* They're composed and used with the {@link TopologyRecoveryRetryHandlerBuilder}.
@@ -32,6 +35,9 @@
3235
*/
3336
public abstract class TopologyRecoveryRetryLogic {
3437

38+
/**
39+
* Channel has been closed because of a resource that doesn't exist.
40+
*/
3541
public static final BiPredicate<RecordedEntity, Exception> CHANNEL_CLOSED_NOT_FOUND = (entity, ex) -> {
3642
if (ex.getCause() instanceof ShutdownSignalException) {
3743
ShutdownSignalException cause = (ShutdownSignalException) ex.getCause();
@@ -42,13 +48,19 @@ public abstract class TopologyRecoveryRetryLogic {
4248
return false;
4349
};
4450

51+
/**
52+
* Recover a channel.
53+
*/
4554
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CHANNEL = context -> {
4655
if (!context.entity().getChannel().isOpen()) {
4756
context.connection().recoverChannel(context.entity().getChannel());
4857
}
4958
return null;
5059
};
5160

61+
/**
62+
* Recover the destination queue of a binding.
63+
*/
5264
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING_QUEUE = context -> {
5365
if (context.entity() instanceof RecordedQueueBinding) {
5466
RecordedBinding binding = context.binding();
@@ -63,11 +75,17 @@ public abstract class TopologyRecoveryRetryLogic {
6375
return null;
6476
};
6577

78+
/**
79+
* Recover a binding.
80+
*/
6681
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_BINDING = context -> {
6782
context.binding().recover();
6883
return null;
6984
};
7085

86+
/**
87+
* Recover the queue of a consumer.
88+
*/
7189
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CONSUMER_QUEUE = context -> {
7290
if (context.entity() instanceof RecordedConsumer) {
7391
RecordedConsumer consumer = context.consumer();
@@ -82,5 +100,37 @@ public abstract class TopologyRecoveryRetryLogic {
82100
return null;
83101
};
84102

103+
/**
104+
* Recover all the bindings of the queue of a consumer.
105+
*/
106+
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CONSUMER_QUEUE_BINDINGS = context -> {
107+
if (context.entity() instanceof RecordedConsumer) {
108+
String queue = context.consumer().getQueue();
109+
for (RecordedBinding recordedBinding : context.connection().getRecordedBindings()) {
110+
if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
111+
recordedBinding.recover();
112+
}
113+
}
114+
}
115+
return null;
116+
};
117+
118+
/**
119+
* Recover a consumer.
120+
*/
85121
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_CONSUMER = context -> context.consumer().recover();
122+
123+
/**
124+
* Pre-configured {@link DefaultRetryHandler} that retries recovery of bindings and consumers
125+
* when their respective queue is not found.
126+
* This retry handler can be useful for long recovery processes, whereby auto-delete queues
127+
* can be deleted between queue recovery and binding/consumer recovery.
128+
*/
129+
public static final RetryHandler RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
130+
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
131+
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
132+
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
133+
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
134+
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)))
135+
.build();
86136
}

src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,7 @@
2323

2424
import java.util.HashMap;
2525

26-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;
27-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.CHANNEL_CLOSED_NOT_FOUND;
28-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING;
29-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_BINDING_QUEUE;
30-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CHANNEL;
31-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER;
32-
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RECOVER_CONSUMER_QUEUE;
26+
import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryLogic.RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER;
3327
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
3428
import static org.junit.Assert.assertTrue;
3529

@@ -57,13 +51,7 @@ public void topologyRecoveryRetry() throws Exception {
5751
@Override
5852
protected ConnectionFactory newConnectionFactory() {
5953
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
60-
connectionFactory.setTopologyRecoveryRetryHandler(
61-
builder().bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
62-
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
63-
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
64-
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)))
65-
.build()
66-
);
54+
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER);
6755
connectionFactory.setNetworkRecoveryInterval(1000);
6856
return connectionFactory;
6957
}

0 commit comments

Comments
 (0)