Skip to content

Commit f3af73e

Browse files
vikinghawkmichaelklishin
authored andcommitted
Allow changing queue name during recovery
Author: Michael Dent <michael.dent@cerner.com>
1 parent 318b784 commit f3af73e

File tree

6 files changed

+68
-16
lines changed

6 files changed

+68
-16
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.impl.nio.NioParams;
2020
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
2121
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
22+
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
2223
import com.rabbitmq.client.impl.recovery.RetryHandler;
2324
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2425
import org.slf4j.Logger;
@@ -190,6 +191,8 @@ public class ConnectionFactory implements Cloneable {
190191
* @since 5.4.0
191192
*/
192193
private RetryHandler topologyRecoveryRetryHandler;
194+
195+
private RecoveredQueueNameSupplier recoveredQueueNameSupplier;
193196

194197
/**
195198
* Traffic listener notified of inbound and outbound {@link Command}s.
@@ -1267,6 +1270,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
12671270
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
12681271
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
12691272
result.setTopologyRecoveryRetryHandler(topologyRecoveryRetryHandler);
1273+
result.setRecoveredQueueNameSupplier(recoveredQueueNameSupplier);
12701274
result.setTrafficListener(trafficListener);
12711275
result.setCredentialsRefreshService(credentialsRefreshService);
12721276
return result;
@@ -1648,6 +1652,15 @@ public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalExc
16481652
public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHandler) {
16491653
this.topologyRecoveryRetryHandler = topologyRecoveryRetryHandler;
16501654
}
1655+
1656+
/**
1657+
* Set the recovered queue name supplier. Default is use the same queue name when recovering queues.
1658+
*
1659+
* @param recoveredQueueNameSupplier queue name supplier
1660+
*/
1661+
public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
1662+
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
1663+
}
16511664

16521665
/**
16531666
* Traffic listener notified of inbound and outbound {@link Command}s.

src/main/java/com/rabbitmq/client/impl/ConnectionParams.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.rabbitmq.client.SaslConfig;
2222
import com.rabbitmq.client.ShutdownSignalException;
2323
import com.rabbitmq.client.TrafficListener;
24+
import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier;
2425
import com.rabbitmq.client.impl.recovery.RetryHandler;
2526
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;
2627

@@ -54,7 +55,8 @@ public class ConnectionParams {
5455
private TopologyRecoveryFilter topologyRecoveryFilter;
5556
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
5657
private RetryHandler topologyRecoveryRetryHandler;
57-
58+
private RecoveredQueueNameSupplier recoveredQueueNameSupplier;
59+
5860
private ExceptionHandler exceptionHandler;
5961
private ThreadFactory threadFactory;
6062

@@ -271,6 +273,14 @@ public void setTopologyRecoveryRetryHandler(RetryHandler topologyRecoveryRetryHa
271273
public RetryHandler getTopologyRecoveryRetryHandler() {
272274
return topologyRecoveryRetryHandler;
273275
}
276+
277+
public void setRecoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
278+
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
279+
}
280+
281+
public RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
282+
return recoveredQueueNameSupplier;
283+
}
274284

275285
public void setTrafficListener(TrafficListener trafficListener) {
276286
this.trafficListener = trafficListener;

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,8 @@ public void queueDeclareNoWait(String queue,
355355
durable(durable).
356356
exclusive(exclusive).
357357
autoDelete(autoDelete).
358-
arguments(arguments);
358+
arguments(arguments).
359+
recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
359360
delegate.queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments);
360361
recordQueue(queue, meta);
361362

@@ -848,7 +849,8 @@ private void recordQueue(AMQP.Queue.DeclareOk ok, String queue, boolean durable,
848849
durable(durable).
849850
exclusive(exclusive).
850851
autoDelete(autoDelete).
851-
arguments(arguments);
852+
arguments(arguments).
853+
recoveredQueueNameSupplier(connection.getRecoveredQueueNameSupplier());
852854
if (queue.equals(RecordedQueue.EMPTY_STRING)) {
853855
q.serverNamed(true);
854856
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
9696
private final Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;
9797

9898
private final RetryHandler retryHandler;
99+
100+
private final RecoveredQueueNameSupplier recoveredQueueNameSupplier;
99101

100102
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
101103
this(params, f, new ListAddressResolver(addrs));
@@ -119,6 +121,8 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
119121
letAllPassFilter() : params.getTopologyRecoveryFilter();
120122

121123
this.retryHandler = params.getTopologyRecoveryRetryHandler();
124+
this.recoveredQueueNameSupplier = params.getRecoveredQueueNameSupplier() == null ?
125+
RecordedQueue.DEFAULT_QUEUE_NAME_SUPPLIER : params.getRecoveredQueueNameSupplier();
122126
}
123127

124128
private void setupErrorOnWriteListenerForPotentialRecovery() {
@@ -564,6 +568,10 @@ public void addConsumerRecoveryListener(ConsumerRecoveryListener listener) {
564568
public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
565569
this.consumerRecoveryListeners.remove(listener);
566570
}
571+
572+
RecoveredQueueNameSupplier getRecoveredQueueNameSupplier() {
573+
return this.recoveredQueueNameSupplier;
574+
}
567575

568576
private synchronized void beginAutomaticRecovery() throws InterruptedException {
569577
final long delay = this.params.getRecoveryDelayHandler().getDelay(0);
@@ -800,18 +808,14 @@ private void internalRecoverQueue(final String oldName, RecordedQueue q, boolean
800808
}
801809
String newName = q.getName();
802810
if (!oldName.equals(newName)) {
803-
// make sure server-named queues are re-added with
804-
// their new names. MK.
811+
// make sure queues are re-added with
812+
// their new names, if applicable. MK.
805813
synchronized (this.recordedQueues) {
806814
this.propagateQueueNameChangeToBindings(oldName, newName);
807815
this.propagateQueueNameChangeToConsumers(oldName, newName);
808816
// bug26552:
809817
// remove old name after we've updated the bindings and consumers,
810-
// plus only for server-named queues, both to make sure we don't lose
811-
// anything to recover. MK.
812-
if(q.isServerNamed()) {
813-
deleteRecordedQueue(oldName);
814-
}
818+
deleteRecordedQueue(oldName);
815819
this.recordedQueues.put(newName, q);
816820
}
817821
}

src/main/java/com/rabbitmq/client/impl/recovery/RecordedQueue.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323
*/
2424
public class RecordedQueue extends RecordedNamedEntity {
2525
public static final String EMPTY_STRING = "";
26+
27+
static final RecoveredQueueNameSupplier DEFAULT_QUEUE_NAME_SUPPLIER = q -> q.isServerNamed() ? EMPTY_STRING : q.name;
28+
29+
private RecoveredQueueNameSupplier recoveredQueueNameSupplier = DEFAULT_QUEUE_NAME_SUPPLIER;
2630
private boolean durable;
2731
private boolean autoDelete;
2832
private Map<String, Object> arguments;
@@ -60,11 +64,7 @@ public void recover() throws IOException {
6064
}
6165

6266
public String getNameToUseForRecovery() {
63-
if(isServerNamed()) {
64-
return EMPTY_STRING;
65-
} else {
66-
return this.name;
67-
}
67+
return recoveredQueueNameSupplier.getNameToUseForRecovery(this);
6868
}
6969

7070
public RecordedQueue durable(boolean value) {
@@ -89,10 +89,15 @@ public RecordedQueue arguments(Map<String, Object> value) {
8989
this.arguments = value;
9090
return this;
9191
}
92-
92+
9393
public Map<String, Object> getArguments() {
9494
return arguments;
9595
}
96+
97+
public RecordedQueue recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier) {
98+
this.recoveredQueueNameSupplier = recoveredQueueNameSupplier;
99+
return this;
100+
}
96101

97102
@Override
98103
public String toString() {
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.rabbitmq.client.impl.recovery;
2+
3+
/**
4+
* Functional callback interface that can be used to rename a queue during topology recovery.
5+
* Can use along with {@link QueueRecoveryListener} to know when such a queue has been recovered successfully.
6+
*
7+
* @see QueueRecoveryListener
8+
*/
9+
@FunctionalInterface
10+
public interface RecoveredQueueNameSupplier {
11+
12+
/**
13+
* Get the queue name to use when recovering this RecordedQueue entity
14+
* @param recordedQueue the queue to be recovered
15+
* @return new queue name
16+
*/
17+
String getNameToUseForRecovery(final RecordedQueue recordedQueue);
18+
}

0 commit comments

Comments
 (0)