Skip to content

Commit 41f3a86

Browse files
Merge pull request #209 from rabbitmq/rabbitmq-java-client-208
Clean AutorecoveringConnection consumers correctly
2 parents 28a7490 + aee3e10 commit 41f3a86

File tree

2 files changed

+44
-12
lines changed

2 files changed

+44
-12
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
import com.rabbitmq.client.*;
1919

2020
import java.io.IOException;
21-
import java.util.List;
22-
import java.util.Map;
21+
import java.util.*;
2322
import java.util.concurrent.CopyOnWriteArrayList;
2423
import java.util.concurrent.TimeoutException;
2524

@@ -37,6 +36,7 @@ public class AutorecoveringChannel implements Channel, Recoverable {
3736
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
3837
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
3938
private final List<FlowListener> flowListeners = new CopyOnWriteArrayList<FlowListener>();
39+
private final Set<String> consumerTags = Collections.synchronizedSet(new HashSet<String>());
4040
private int prefetchCountConsumer;
4141
private int prefetchCountGlobal;
4242
private boolean usesPublisherConfirms;
@@ -64,9 +64,12 @@ public Channel getDelegate() {
6464
@Override
6565
public void close() throws IOException, TimeoutException {
6666
try {
67-
delegate.close();
67+
delegate.close();
6868
} finally {
69-
this.connection.unregisterChannel(this);
69+
for (String consumerTag : consumerTags) {
70+
this.connection.deleteRecordedConsumer(consumerTag);
71+
}
72+
this.connection.unregisterChannel(this);
7073
}
7174
}
7275

@@ -701,10 +704,12 @@ private void recordConsumer(String result,
701704
exclusive(exclusive).
702705
arguments(arguments).
703706
consumer(callback);
707+
this.consumerTags.add(result);
704708
this.connection.recordConsumer(result, consumer);
705709
}
706710

707711
private RecordedConsumer deleteRecordedConsumer(String consumerTag) {
712+
this.consumerTags.remove(consumerTag);
708713
return this.connection.deleteRecordedConsumer(consumerTag);
709714
}
710715

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,14 @@
1616
package com.rabbitmq.client.test.functional;
1717

1818
import com.rabbitmq.client.*;
19-
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
20-
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
21-
import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener;
22-
import com.rabbitmq.client.impl.recovery.QueueRecoveryListener;
19+
import com.rabbitmq.client.impl.recovery.*;
2320
import com.rabbitmq.client.test.BrokerTestCase;
2421
import com.rabbitmq.tools.Host;
2522
import org.junit.Test;
2623

2724
import java.io.IOException;
28-
import java.util.ArrayList;
29-
import java.util.Arrays;
30-
import java.util.List;
31-
import java.util.UUID;
25+
import java.lang.reflect.Field;
26+
import java.util.*;
3227
import java.util.concurrent.CountDownLatch;
3328
import java.util.concurrent.TimeUnit;
3429
import java.util.concurrent.TimeoutException;
@@ -637,6 +632,38 @@ public void handleDelivery(String consumerTag,
637632
publishingConnection.abort();
638633
}
639634

635+
@Test public void consumersAreRemovedFromConnectionWhenChannelIsClosed() throws Exception {
636+
AutorecoveringConnection connection = newRecoveringConnection(true);
637+
try {
638+
Field consumersField = AutorecoveringConnection.class.getDeclaredField("consumers");
639+
consumersField.setAccessible(true);
640+
Map<?, ?> connectionConsumers = (Map<?, ?>) consumersField.get(connection);
641+
642+
Channel channel1 = connection.createChannel();
643+
Channel channel2 = connection.createChannel();
644+
645+
assertEquals(0, connectionConsumers.size());
646+
647+
String queue = channel1.queueDeclare().getQueue();
648+
649+
channel1.basicConsume(queue, true, new HashMap<String, Object>(), new DefaultConsumer(channel1));
650+
assertEquals(1, connectionConsumers.size());
651+
channel1.basicConsume(queue, true, new HashMap<String, Object>(), new DefaultConsumer(channel1));
652+
assertEquals(2, connectionConsumers.size());
653+
654+
channel2.basicConsume(queue, true, new HashMap<String, Object>(), new DefaultConsumer(channel2));
655+
assertEquals(3, connectionConsumers.size());
656+
657+
channel1.close();
658+
assertEquals(3 - 2, connectionConsumers.size());
659+
660+
channel2.close();
661+
assertEquals(0, connectionConsumers.size());
662+
} finally {
663+
connection.abort();
664+
}
665+
}
666+
640667
private void assertConsumerCount(int exp, String q) throws IOException {
641668
assertEquals(exp, channel.queueDeclarePassive(q).getConsumerCount());
642669
}

0 commit comments

Comments
 (0)