From aee3e10a399c99315a8c1b7ae478de1414cb485f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 21 Oct 2016 10:47:48 +0200 Subject: [PATCH] Clean AutorecoveringConnection consumers correctly Fixes #208 --- .../impl/recovery/AutorecoveringChannel.java | 13 ++++-- .../test/functional/ConnectionRecovery.java | 43 +++++++++++++++---- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 9de2663ae3..0a95c168c1 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -18,8 +18,7 @@ import com.rabbitmq.client.*; import java.io.IOException; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeoutException; @@ -37,6 +36,7 @@ public class AutorecoveringChannel implements Channel, Recoverable { private final List returnListeners = new CopyOnWriteArrayList(); private final List confirmListeners = new CopyOnWriteArrayList(); private final List flowListeners = new CopyOnWriteArrayList(); + private final Set consumerTags = Collections.synchronizedSet(new HashSet()); private int prefetchCountConsumer; private int prefetchCountGlobal; private boolean usesPublisherConfirms; @@ -64,9 +64,12 @@ public Channel getDelegate() { @Override public void close() throws IOException, TimeoutException { try { - delegate.close(); + delegate.close(); } finally { - this.connection.unregisterChannel(this); + for (String consumerTag : consumerTags) { + this.connection.deleteRecordedConsumer(consumerTag); + } + this.connection.unregisterChannel(this); } } @@ -701,10 +704,12 @@ private void recordConsumer(String result, exclusive(exclusive). arguments(arguments). consumer(callback); + this.consumerTags.add(result); this.connection.recordConsumer(result, consumer); } private RecordedConsumer deleteRecordedConsumer(String consumerTag) { + this.consumerTags.remove(consumerTag); return this.connection.deleteRecordedConsumer(consumerTag); } diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index be11fba332..9a46d2a5dc 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -16,19 +16,14 @@ package com.rabbitmq.client.test.functional; import com.rabbitmq.client.*; -import com.rabbitmq.client.impl.recovery.AutorecoveringChannel; -import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; -import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener; -import com.rabbitmq.client.impl.recovery.QueueRecoveryListener; +import com.rabbitmq.client.impl.recovery.*; import com.rabbitmq.client.test.BrokerTestCase; import com.rabbitmq.tools.Host; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; +import java.lang.reflect.Field; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -637,6 +632,38 @@ public void handleDelivery(String consumerTag, publishingConnection.abort(); } + @Test public void consumersAreRemovedFromConnectionWhenChannelIsClosed() throws Exception { + AutorecoveringConnection connection = newRecoveringConnection(true); + try { + Field consumersField = AutorecoveringConnection.class.getDeclaredField("consumers"); + consumersField.setAccessible(true); + Map connectionConsumers = (Map) consumersField.get(connection); + + Channel channel1 = connection.createChannel(); + Channel channel2 = connection.createChannel(); + + assertEquals(0, connectionConsumers.size()); + + String queue = channel1.queueDeclare().getQueue(); + + channel1.basicConsume(queue, true, new HashMap(), new DefaultConsumer(channel1)); + assertEquals(1, connectionConsumers.size()); + channel1.basicConsume(queue, true, new HashMap(), new DefaultConsumer(channel1)); + assertEquals(2, connectionConsumers.size()); + + channel2.basicConsume(queue, true, new HashMap(), new DefaultConsumer(channel2)); + assertEquals(3, connectionConsumers.size()); + + channel1.close(); + assertEquals(3 - 2, connectionConsumers.size()); + + channel2.close(); + assertEquals(0, connectionConsumers.size()); + } finally { + connection.abort(); + } + } + private void assertConsumerCount(int exp, String q) throws IOException { assertEquals(exp, channel.queueDeclarePassive(q).getConsumerCount()); }