Skip to content

Begin recovery after all shutdown listeners have been given a chance to run #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import java.net.InetAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.*;

import com.rabbitmq.client.AMQP;
Expand All @@ -45,6 +42,7 @@
import com.rabbitmq.client.SaslMechanism;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
import com.rabbitmq.utility.BlockingCell;

final class Copyright {
Expand All @@ -65,6 +63,9 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
private Thread mainLoopThread;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();

private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
new ArrayList<RecoveryCanBeginListener>();

/**
* Retrieve a copy of the default table of client properties that
* will be sent to the server during connection startup. This
Expand Down Expand Up @@ -576,10 +577,31 @@ public void run() {
_frameHandler.close();
_appContinuation.set(null);
notifyListeners();
// assuming that shutdown listeners do not do anything
// asynchronously, e.g. start new threads, this effectively
// guarantees that we only begin recovery when all shutdown
// listeners have executed
notifyRecoveryCanBeginListeners();
}
}
}

private void notifyRecoveryCanBeginListeners() {
ShutdownSignalException sse = this.getCloseReason();
for(RecoveryCanBeginListener fn : this.recoveryCanBeginListeners) {
fn.recoveryCanBegin(sse);
}
}

public void addRecoveryCanBeginListener(RecoveryCanBeginListener fn) {
this.recoveryCanBeginListeners.add(fn);
}

@SuppressWarnings(value = "unused")
public void removeRecoveryCanBeginListener(RecoveryCanBeginListener fn) {
this.recoveryCanBeginListeners.remove(fn);
}

/**
* Called when a frame-read operation times out
* @throws MissedHeartbeatException if heart-beats have been missed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.TopologyRecoveryException;
import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.ConnectionParams;
import com.rabbitmq.client.ExceptionHandler;
import com.rabbitmq.client.impl.FrameHandlerFactory;
Expand Down Expand Up @@ -254,6 +255,13 @@ public void abort(int timeout) {
delegate.abort(timeout);
}

/**
* Not supposed to be used outside of automated tests.
*/
public AMQConnection getDelegate() {
return delegate;
}

/**
* @see com.rabbitmq.client.Connection#getCloseReason()
*/
Expand Down Expand Up @@ -376,8 +384,10 @@ public int getLocalPort() {

private void addAutomaticRecoveryListener() {
final AutorecoveringConnection c = this;
ShutdownListener automaticRecoveryListener = new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
// this listener will run after shutdown listeners,
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
RecoveryCanBeginListener starter = new RecoveryCanBeginListener() {
public void recoveryCanBegin(ShutdownSignalException cause) {
try {
if (shouldTriggerConnectionRecovery(cause)) {
c.beginAutomaticRecovery();
Expand All @@ -388,10 +398,7 @@ public void shutdownCompleted(ShutdownSignalException cause) {
}
};
synchronized (this) {
if(!this.shutdownHooks.contains(automaticRecoveryListener)) {
this.shutdownHooks.add(automaticRecoveryListener);
}
this.delegate.addShutdownListener(automaticRecoveryListener);
this.delegate.addRecoveryCanBeginListener(starter);
}
}

Expand Down Expand Up @@ -441,18 +448,20 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {

synchronized private void beginAutomaticRecovery() throws InterruptedException, IOException, TopologyRecoveryException {
Thread.sleep(this.params.getNetworkRecoveryInterval());
if (!this.recoverConnection())
return;

this.recoverShutdownListeners();
this.recoverBlockedListeners();
this.recoverChannels();
if(this.params.isTopologyRecoveryEnabled()) {
this.recoverEntities();
this.recoverConsumers();
}
if (!this.recoverConnection()) {
return;
}

this.notifyRecoveryListeners();
this.addAutomaticRecoveryListener();
this.recoverShutdownListeners();
this.recoverBlockedListeners();
this.recoverChannels();
if(this.params.isTopologyRecoveryEnabled()) {
this.recoverEntities();
this.recoverConsumers();
}

this.notifyRecoveryListeners();
}

private void recoverShutdownListeners() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.rabbitmq.client.impl.recovery;

import com.rabbitmq.client.ShutdownSignalException;

/**
* Used internally to indicate when connection recovery can
* begin. See {@link https://github.com/rabbitmq/rabbitmq-java-client/issues/135}.
* This is package-local by design.
*/
public interface RecoveryCanBeginListener {
void recoveryCanBegin(ShutdownSignalException cause);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package com.rabbitmq.client.test.functional;

import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.recovery.*;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.impl.recovery.ConsumerRecoveryListener;
import com.rabbitmq.client.impl.recovery.QueueRecoveryListener;
import com.rabbitmq.client.test.BrokerTestCase;
import com.rabbitmq.tools.Host;

Expand All @@ -21,8 +19,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

@SuppressWarnings("ThrowFromFinallyBlock")
public class ConnectionRecovery extends BrokerTestCase {
public static final long RECOVERY_INTERVAL = 2000;
private static final long RECOVERY_INTERVAL = 2000;

public void testConnectionRecovery() throws IOException, InterruptedException {
assertTrue(connection.isOpen());
Expand Down Expand Up @@ -89,6 +88,44 @@ public void testConnectionRecoveryWithDisabledTopologyRecovery()
}
}

// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
public void testThatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException {
final List<String> events = new ArrayList<String>();
final CountDownLatch latch = new CountDownLatch(1);
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
events.add("shutdown hook 1");
}
});
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
events.add("shutdown hook 2");
}
});
// note: we do not want to expose RecoveryCanBeginListener so this
// test does not use it
((AutorecoveringConnection)connection).getDelegate().addRecoveryCanBeginListener(new RecoveryCanBeginListener() {
@Override
public void recoveryCanBegin(ShutdownSignalException cause) {
events.add("recovery start hook 1");
}
});
((AutorecoveringConnection)connection).addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecovery(Recoverable recoverable) {
latch.countDown();
}
});
assertTrue(connection.isOpen());
closeAndWaitForRecovery();
assertTrue(connection.isOpen());
assertEquals("shutdown hook 1", events.get(0));
assertEquals("shutdown hook 2", events.get(1));
assertEquals("recovery start hook 1", events.get(2));
connection.close();
wait(latch);
}

public void testShutdownHooksRecoveryOnConnection() throws IOException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(2);
connection.addShutdownListener(new ShutdownListener() {
Expand Down Expand Up @@ -211,7 +248,7 @@ public void testClientNamedQueueRecoveryWithNoWait() throws IOException, Interru
testClientNamedQueueRecoveryWith("java-client.test.recovery.q1-nowait", true);
}

protected void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws IOException, InterruptedException, TimeoutException {
private void testClientNamedQueueRecoveryWith(String q, boolean noWait) throws IOException, InterruptedException, TimeoutException {
Channel ch = connection.createChannel();
if(noWait) {
declareClientNamedQueueNoWait(ch, q);
Expand Down Expand Up @@ -750,20 +787,20 @@ private ConnectionFactory buildConnectionFactoryWithRecoveryEnabled(boolean disa
}

private static void wait(CountDownLatch latch) throws InterruptedException {
// Very very generous amount of time to wait, just make sure we never
// hang forever
assertTrue(latch.await(1800, TimeUnit.SECONDS));
// we want to wait for recovery to complete for a reasonable amount of time
// but still make recovery failures easy to notice in development environments
assertTrue(latch.await(90, TimeUnit.SECONDS));
}

private void waitForConfirms(Channel ch) throws InterruptedException, TimeoutException {
ch.waitForConfirms(30 * 60 * 1000);
}

protected void assertRecordedQueues(Connection conn, int size) {
private void assertRecordedQueues(Connection conn, int size) {
assertEquals(size, ((AutorecoveringConnection)conn).getRecordedQueues().size());
}

protected void assertRecordedExchanges(Connection conn, int size) {
private void assertRecordedExchanges(Connection conn, int size) {
assertEquals(size, ((AutorecoveringConnection)conn).getRecordedExchanges().size());
}
}