Skip to content

Commit 9711406

Browse files
committed
Make retry condition more tolerant with wildcards
References #387
1 parent 9176062 commit 9711406

File tree

4 files changed

+31
-27
lines changed

4 files changed

+31
-27
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/DefaultRetryHandler.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
*/
3636
public class DefaultRetryHandler implements RetryHandler {
3737

38-
private final BiPredicate<RecordedQueue, Exception> queueRecoveryRetryCondition;
39-
private final BiPredicate<RecordedExchange, Exception> exchangeRecoveryRetryCondition;
40-
private final BiPredicate<RecordedBinding, Exception> bindingRecoveryRetryCondition;
41-
private final BiPredicate<RecordedConsumer, Exception> consumerRecoveryRetryCondition;
38+
private final BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition;
39+
private final BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition;
40+
private final BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition;
41+
private final BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition;
4242

4343
private final RetryOperation<?> queueRecoveryRetryOperation;
4444
private final RetryOperation<?> exchangeRecoveryRetryOperation;
@@ -49,10 +49,10 @@ public class DefaultRetryHandler implements RetryHandler {
4949

5050
private final BackoffPolicy backoffPolicy;
5151

52-
public DefaultRetryHandler(BiPredicate<RecordedQueue, Exception> queueRecoveryRetryCondition,
53-
BiPredicate<RecordedExchange, Exception> exchangeRecoveryRetryCondition,
54-
BiPredicate<RecordedBinding, Exception> bindingRecoveryRetryCondition,
55-
BiPredicate<RecordedConsumer, Exception> consumerRecoveryRetryCondition,
52+
public DefaultRetryHandler(BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition,
53+
BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition,
54+
BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition,
55+
BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition,
5656
RetryOperation<?> queueRecoveryRetryOperation,
5757
RetryOperation<?> exchangeRecoveryRetryOperation,
5858
RetryOperation<?> bindingRecoveryRetryOperation,
@@ -72,27 +72,31 @@ public DefaultRetryHandler(BiPredicate<RecordedQueue, Exception> queueRecoveryRe
7272
this.retryAttempts = retryAttempts;
7373
}
7474

75+
@SuppressWarnings("unchecked")
7576
@Override
7677
public RetryResult retryQueueRecovery(RetryContext context) throws Exception {
77-
return doRetry(queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context);
78+
return doRetry((BiPredicate<RecordedEntity, Exception>) queueRecoveryRetryCondition, queueRecoveryRetryOperation, context.queue(), context);
7879
}
7980

81+
@SuppressWarnings("unchecked")
8082
@Override
8183
public RetryResult retryExchangeRecovery(RetryContext context) throws Exception {
82-
return doRetry(exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context);
84+
return doRetry((BiPredicate<RecordedEntity, Exception>) exchangeRecoveryRetryCondition, exchangeRecoveryRetryOperation, context.exchange(), context);
8385
}
8486

87+
@SuppressWarnings("unchecked")
8588
@Override
8689
public RetryResult retryBindingRecovery(RetryContext context) throws Exception {
87-
return doRetry(bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context);
90+
return doRetry((BiPredicate<RecordedEntity, Exception>) bindingRecoveryRetryCondition, bindingRecoveryRetryOperation, context.binding(), context);
8891
}
8992

93+
@SuppressWarnings("unchecked")
9094
@Override
9195
public RetryResult retryConsumerRecovery(RetryContext context) throws Exception {
92-
return doRetry(consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context);
96+
return doRetry((BiPredicate<RecordedEntity, Exception>) consumerRecoveryRetryCondition, consumerRecoveryRetryOperation, context.consumer(), context);
9397
}
9498

95-
protected <T extends RecordedEntity> RetryResult doRetry(BiPredicate<T, Exception> condition, RetryOperation<?> operation, T entity, RetryContext context)
99+
protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition, RetryOperation<?> operation, RecordedEntity entity, RetryContext context)
96100
throws Exception {
97101
int attempts = 0;
98102
Exception exception = context.exception();
@@ -107,7 +111,6 @@ protected <T extends RecordedEntity> RetryResult doRetry(BiPredicate<T, Exceptio
107111
} catch (Exception e) {
108112
exception = e;
109113
attempts++;
110-
continue;
111114
}
112115
} else {
113116
throw exception;

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryHandlerBuilder.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
*/
3131
public class TopologyRecoveryRetryHandlerBuilder {
3232

33-
private BiPredicate<RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
34-
private BiPredicate<RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
35-
private BiPredicate<RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
36-
private BiPredicate<RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;
33+
private BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition = (q, e) -> false;
34+
private BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition = (ex, e) -> false;
35+
private BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition = (b, e) -> false;
36+
private BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition = (c, e) -> false;
3737

3838
private DefaultRetryHandler.RetryOperation<?> queueRecoveryRetryOperation = context -> null;
3939
private DefaultRetryHandler.RetryOperation<?> exchangeRecoveryRetryOperation = context -> null;
@@ -50,25 +50,25 @@ public static TopologyRecoveryRetryHandlerBuilder builder() {
5050
}
5151

5252
public TopologyRecoveryRetryHandlerBuilder queueRecoveryRetryCondition(
53-
BiPredicate<RecordedQueue, Exception> queueRecoveryRetryCondition) {
53+
BiPredicate<? super RecordedQueue, Exception> queueRecoveryRetryCondition) {
5454
this.queueRecoveryRetryCondition = queueRecoveryRetryCondition;
5555
return this;
5656
}
5757

5858
public TopologyRecoveryRetryHandlerBuilder exchangeRecoveryRetryCondition(
59-
BiPredicate<RecordedExchange, Exception> exchangeRecoveryRetryCondition) {
59+
BiPredicate<? super RecordedExchange, Exception> exchangeRecoveryRetryCondition) {
6060
this.exchangeRecoveryRetryCondition = exchangeRecoveryRetryCondition;
6161
return this;
6262
}
6363

6464
public TopologyRecoveryRetryHandlerBuilder bindingRecoveryRetryCondition(
65-
BiPredicate<RecordedBinding, Exception> bindingRecoveryRetryCondition) {
65+
BiPredicate<? super RecordedBinding, Exception> bindingRecoveryRetryCondition) {
6666
this.bindingRecoveryRetryCondition = bindingRecoveryRetryCondition;
6767
return this;
6868
}
6969

7070
public TopologyRecoveryRetryHandlerBuilder consumerRecoveryRetryCondition(
71-
BiPredicate<RecordedConsumer, Exception> consumerRecoveryRetryCondition) {
71+
BiPredicate<? super RecordedConsumer, Exception> consumerRecoveryRetryCondition) {
7272
this.consumerRecoveryRetryCondition = consumerRecoveryRetryCondition;
7373
return this;
7474
}

src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryRetryLogic.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.client.AMQP;
1919
import com.rabbitmq.client.ShutdownSignalException;
2020

21+
import java.util.function.BiPredicate;
2122
import java.util.function.Predicate;
2223

2324
/**
@@ -31,9 +32,9 @@
3132
*/
3233
public abstract class TopologyRecoveryRetryLogic {
3334

34-
public static final Predicate<Exception> CHANNEL_CLOSED_NOT_FOUND = e -> {
35-
if (e.getCause() instanceof ShutdownSignalException) {
36-
ShutdownSignalException cause = (ShutdownSignalException) e.getCause();
35+
public static final BiPredicate<RecordedEntity, Exception> CHANNEL_CLOSED_NOT_FOUND = (entity, ex) -> {
36+
if (ex.getCause() instanceof ShutdownSignalException) {
37+
ShutdownSignalException cause = (ShutdownSignalException) ex.getCause();
3738
if (cause.getReason() instanceof AMQP.Channel.Close) {
3839
return ((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404;
3940
}

src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryRetry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ public void topologyRecoveryRetry() throws Exception {
5858
protected ConnectionFactory newConnectionFactory() {
5959
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
6060
connectionFactory.setTopologyRecoveryRetryHandler(
61-
builder().bindingRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e))
62-
.consumerRecoveryRetryCondition((b, e) -> CHANNEL_CLOSED_NOT_FOUND.test(e))
61+
builder().bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
62+
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
6363
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
6464
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)))
6565
.build()

0 commit comments

Comments
 (0)