From 593a6340f1045d395656ce27aec10fa863b821be Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 13 Apr 2016 17:16:35 +0300 Subject: [PATCH 1/4] Use a more reasonable timeout --- .../rabbitmq/client/test/functional/ConnectionRecovery.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index efb944a347..84b1f521b0 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -750,9 +750,9 @@ private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disa } private static void wait(CountDownLatch latch) throws InterruptedException { - // Very very generous amount of time to wait, just make sure we never - // hang forever - assertTrue(latch.await(1800, TimeUnit.SECONDS)); + // we want to wait for recovery to complete for a reasonable amount of time + // but still make recovery failures easy to notice in development environments + assertTrue(latch.await(90, TimeUnit.SECONDS)); } private void waitForConfirms(Channel ch) throws InterruptedException, TimeoutException { From 066ce86e747b2ea6781bfc93515a41a4a8e6e076 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 13 Apr 2016 17:16:50 +0300 Subject: [PATCH 2/4] Introduce a way for AMQConnection to notify its recovery aware wrapper Fixes #135. --- .../rabbitmq/client/impl/AMQConnection.java | 30 +++++++++++++--- .../recovery/AutorecoveringConnection.java | 35 ++++++++++--------- .../recovery/RecoveryCanBeginListener.java | 12 +++++++ 3 files changed, 56 insertions(+), 21 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/RecoveryCanBeginListener.java diff --git a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java index 54e17c0918..d0f598e236 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQConnection.java @@ -21,10 +21,7 @@ import java.net.InetAddress; import java.net.SocketException; import java.net.SocketTimeoutException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.concurrent.*; import com.rabbitmq.client.AMQP; @@ -45,6 +42,7 @@ import com.rabbitmq.client.SaslMechanism; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation; +import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener; import com.rabbitmq.utility.BlockingCell; final class Copyright { @@ -65,6 +63,9 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti private Thread mainLoopThread; private ThreadFactory threadFactory = Executors.defaultThreadFactory(); + private final List recoveryCanBeginListeners = + new ArrayList(); + /** * Retrieve a copy of the default table of client properties that * will be sent to the server during connection startup. This @@ -576,10 +577,31 @@ public void run() { _frameHandler.close(); _appContinuation.set(null); notifyListeners(); + // assuming that shutdown listeners do not do anything + // asynchronously, e.g. start new threads, this effectively + // guarantees that we only begin recovery when all shutdown + // listeners have executed + notifyRecoveryCanBeginListeners(); } } } + private void notifyRecoveryCanBeginListeners() { + ShutdownSignalException sse = this.getCloseReason(); + for(RecoveryCanBeginListener fn : this.recoveryCanBeginListeners) { + fn.recoveryCanBegin(sse); + } + } + + public void addRecoveryCanBeginListener(RecoveryCanBeginListener fn) { + this.recoveryCanBeginListeners.add(fn); + } + + @SuppressWarnings(value = "unused") + public void removeRecoveryCanBeginListener(RecoveryCanBeginListener fn) { + this.recoveryCanBeginListeners.remove(fn); + } + /** * Called when a frame-read operation times out * @throws MissedHeartbeatException if heart-beats have been missed diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index b559625fb5..24ecf9d522 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -376,8 +376,10 @@ public int getLocalPort() { private void addAutomaticRecoveryListener() { final AutorecoveringConnection c = this; - ShutdownListener automaticRecoveryListener = new ShutdownListener() { - public void shutdownCompleted(ShutdownSignalException cause) { + // this listener will run after shutdown listeners, + // see https://github.com/rabbitmq/rabbitmq-java-client/issues/135 + RecoveryCanBeginListener starter = new RecoveryCanBeginListener() { + public void recoveryCanBegin(ShutdownSignalException cause) { try { if (shouldTriggerConnectionRecovery(cause)) { c.beginAutomaticRecovery(); @@ -388,10 +390,7 @@ public void shutdownCompleted(ShutdownSignalException cause) { } }; synchronized (this) { - if(!this.shutdownHooks.contains(automaticRecoveryListener)) { - this.shutdownHooks.add(automaticRecoveryListener); - } - this.delegate.addShutdownListener(automaticRecoveryListener); + this.delegate.addRecoveryCanBeginListener(starter); } } @@ -441,18 +440,20 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) { synchronized private void beginAutomaticRecovery() throws InterruptedException, IOException, TopologyRecoveryException { Thread.sleep(this.params.getNetworkRecoveryInterval()); - if (!this.recoverConnection()) - return; - - this.recoverShutdownListeners(); - this.recoverBlockedListeners(); - this.recoverChannels(); - if(this.params.isTopologyRecoveryEnabled()) { - this.recoverEntities(); - this.recoverConsumers(); - } + if (!this.recoverConnection()) { + return; + } - this.notifyRecoveryListeners(); + this.addAutomaticRecoveryListener(); + this.recoverShutdownListeners(); + this.recoverBlockedListeners(); + this.recoverChannels(); + if(this.params.isTopologyRecoveryEnabled()) { + this.recoverEntities(); + this.recoverConsumers(); + } + + this.notifyRecoveryListeners(); } private void recoverShutdownListeners() { diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryCanBeginListener.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryCanBeginListener.java new file mode 100644 index 0000000000..6326f3d288 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryCanBeginListener.java @@ -0,0 +1,12 @@ +package com.rabbitmq.client.impl.recovery; + +import com.rabbitmq.client.ShutdownSignalException; + +/** + * Used internally to indicate when connection recovery can + * begin. See {@link https://github.com/rabbitmq/rabbitmq-java-client/issues/135}. + * This is package-local by design. + */ +public interface RecoveryCanBeginListener { + void recoveryCanBegin(ShutdownSignalException cause); +} From 62898ad354c9a250394c8668b7d46331a4fae332 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 13 Apr 2016 17:22:03 +0300 Subject: [PATCH 3/4] Squash a few warnings from IDEA --- .../client/test/functional/ConnectionRecovery.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index 84b1f521b0..4354d63c82 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -21,8 +21,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +@SuppressWarnings("ThrowFromFinallyBlock") public class ConnectionRecovery extends BrokerTestCase { - public static final long RECOVERY_INTERVAL = 2000; + private static final long RECOVERY_INTERVAL = 2000; public void testConnectionRecovery() throws IOException, InterruptedException { assertTrue(connection.isOpen()); @@ -211,7 +212,7 @@ public void testClientNamedQueueRecoveryWithNoWait() throws IOException, Interru testClientNamedQueueRecoveryWith("java-client.test.recovery.q1-nowait", true); } - protected void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws IOException, InterruptedException, TimeoutException { + private void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws IOException, InterruptedException, TimeoutException { Channel ch = connection.createChannel(); if(noWait) { declareClientNamedQueueNoWait(ch, q); @@ -759,11 +760,11 @@ private void waitForConfirms(Channel ch) throws InterruptedException, TimeoutExc ch.waitForConfirms(30 * 60 * 1000); } - protected void assertRecordedQueues(Connection conn, int size) { + private void assertRecordedQueues(Connection conn, int size) { assertEquals(size, ((AutorecoveringConnection)conn).getRecordedQueues().size()); } - protected void assertRecordedExchanges(Connection conn, int size) { + private void assertRecordedExchanges(Connection conn, int size) { assertEquals(size, ((AutorecoveringConnection)conn).getRecordedExchanges().size()); } } From bb252ff2a5ef66464c6b04e18cacf423f5ffb5cb Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 13 Apr 2016 18:03:28 +0300 Subject: [PATCH 4/4] Test that recovery starts after all connection shutdown hooks have executed Note: we cannot guarantee that they'd have *finished* by the time recovery starts. Provided that ShutdownListeners are typically basic sequential functions, that'll be the case most of the time. Fixes #135 --- .../recovery/AutorecoveringConnection.java | 8 ++++ .../test/functional/ConnectionRecovery.java | 44 +++++++++++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 24ecf9d522..49b65cc28a 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -11,6 +11,7 @@ import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.TopologyRecoveryException; +import com.rabbitmq.client.impl.AMQConnection; import com.rabbitmq.client.impl.ConnectionParams; import com.rabbitmq.client.ExceptionHandler; import com.rabbitmq.client.impl.FrameHandlerFactory; @@ -254,6 +255,13 @@ public void abort(int timeout) { delegate.abort(timeout); } + /** + * Not supposed to be used outside of automated tests. + */ + public AMQConnection getDelegate() { + return delegate; + } + /** * @see com.rabbitmq.client.Connection#getCloseReason() */ diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index 4354d63c82..bcd3df2b8d 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -1,12 +1,10 @@ package com.rabbitmq.client.test.functional; import com.rabbitmq.client.*; -import com.rabbitmq.client.impl.recovery.AutorecoveringChannel; -import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.AMQConnection; +import com.rabbitmq.client.impl.recovery.*; import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoveryListener; -import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener; -import com.rabbitmq.client.impl.recovery.QueueRecoveryListener; import com.rabbitmq.client.test.BrokerTestCase; import com.rabbitmq.tools.Host; @@ -90,6 +88,44 @@ public void testConnectionRecoveryWithDisabledTopologyRecovery() } } + // see https://github.com/rabbitmq/rabbitmq-java-client/issues/135 + public void testThatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException { + final List events = new ArrayList(); + final CountDownLatch latch = new CountDownLatch(1); + connection.addShutdownListener(new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + events.add("shutdown hook 1"); + } + }); + connection.addShutdownListener(new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + events.add("shutdown hook 2"); + } + }); + // note: we do not want to expose RecoveryCanBeginListener so this + // test does not use it + ((AutorecoveringConnection)connection).getDelegate().addRecoveryCanBeginListener(new RecoveryCanBeginListener() { + @Override + public void recoveryCanBegin(ShutdownSignalException cause) { + events.add("recovery start hook 1"); + } + }); + ((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() { + @Override + public void handleRecovery(Recoverable recoverable) { + latch.countDown(); + } + }); + assertTrue(connection.isOpen()); + closeAndWaitForRecovery(); + assertTrue(connection.isOpen()); + assertEquals("shutdown hook 1", events.get(0)); + assertEquals("shutdown hook 2", events.get(1)); + assertEquals("recovery start hook 1", events.get(2)); + connection.close(); + wait(latch); + } + public void testShutdownHooksRecoveryOnConnection() throws IOException, InterruptedException { final CountDownLatch latch = new CountDownLatch(2); connection.addShutdownListener(new ShutdownListener() {