Skip to content

Commit 0cfbbca

Browse files
committed
Update consumer tags on connection recovery
References #208
1 parent 41f3a86 commit 0cfbbca

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,4 +720,11 @@ private void maybeDeleteRecordedAutoDeleteQueue(String queue) {
720720
private void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
721721
this.connection.maybeDeleteRecordedAutoDeleteExchange(exchange);
722722
}
723+
724+
public void updateConsumerTag(String tag, String newTag) {
725+
synchronized (this.consumerTags) {
726+
consumerTags.remove(tag);
727+
consumerTags.add(newTag);
728+
}
729+
}
723730
}

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -645,10 +645,14 @@ private void recoverConsumers() {
645645
try {
646646
String newTag = consumer.recover();
647647
// make sure server-generated tags are re-added. MK.
648-
synchronized (this.consumers) {
649-
this.consumers.remove(tag);
650-
this.consumers.put(newTag, consumer);
648+
if(tag != null && !tag.equals(newTag)) {
649+
synchronized (this.consumers) {
650+
this.consumers.remove(tag);
651+
this.consumers.put(newTag, consumer);
652+
}
653+
consumer.getChannel().updateConsumerTag(tag, newTag);
651654
}
655+
652656
for(ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
653657
crl.consumerRecovered(tag, newTag);
654658
}

0 commit comments

Comments
 (0)