Skip to content

Add dedicated executor to close connection in NIO mode #392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ public class NioLoop implements Runnable {

private final NioParams nioParams;

private final ExecutorService connectionShutdownExecutor;

public NioLoop(NioParams nioParams, NioLoopContext loopContext) {
this.nioParams = nioParams;
this.context = loopContext;
this.connectionShutdownExecutor = nioParams.getConnectionShutdownExecutor();
}

@Override
Expand Down Expand Up @@ -283,19 +286,15 @@ protected void dispatchIoErrorToConnection(final SocketChannelFrameHandlerState
}

protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) {
Runnable shutdown = new Runnable() {

@Override
public void run() {
state.getConnection().doFinalShutdown();
}
};
if (executorService() == null) {
Runnable shutdown = () -> state.getConnection().doFinalShutdown();
if (this.connectionShutdownExecutor != null) {
connectionShutdownExecutor.execute(shutdown);
} else if (executorService() != null) {
executorService().execute(shutdown);
} else {
String name = "rabbitmq-connection-shutdown-" + state.getConnection();
Thread shutdownThread = Environment.newThread(threadFactory(), shutdown, name);
shutdownThread.start();
} else {
executorService().submit(shutdown);
}
}

Expand Down
45 changes: 41 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/nio/NioParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ public class NioParams {
private SocketChannelConfigurator socketChannelConfigurator = new DefaultSocketChannelConfigurator();

/** the hook to configure the SSL engine before the connection is open */
private SslEngineConfigurator sslEngineConfigurator = new SslEngineConfigurator() {
@Override
public void configure(SSLEngine sslEngine) throws IOException { }
};
private SslEngineConfigurator sslEngineConfigurator = sslEngine -> { };

/** the executor service used for connection shutdown */
private ExecutorService connectionShutdownExecutor;

public NioParams() {
}
Expand All @@ -72,6 +72,7 @@ public NioParams(NioParams nioParams) {
setNioExecutor(nioParams.getNioExecutor());
setThreadFactory(nioParams.getThreadFactory());
setSslEngineConfigurator(nioParams.getSslEngineConfigurator());
setConnectionShutdownExecutor(nioParams.getConnectionShutdownExecutor());
}

public int getReadByteBufferSize() {
Expand Down Expand Up @@ -186,6 +187,9 @@ public ExecutorService getNioExecutor() {
* number of requested IO threads, plus a few more, as it's also
* used to dispatch the shutdown of connections.
*
* Connection shutdown can also be handled by a dedicated {@link ExecutorService},
* see {@link #setConnectionShutdownExecutor(ExecutorService)}.
*
* It's developer's responsibility to shut down the executor
* when it is no longer needed.
*
Expand All @@ -195,6 +199,7 @@ public ExecutorService getNioExecutor() {
* @return this {@link NioParams} instance
* @see NioParams#setNbIoThreads(int)
* @see NioParams#setThreadFactory(ThreadFactory)
* @see NioParams#setConnectionShutdownExecutor(ExecutorService)
*/
public NioParams setNioExecutor(ExecutorService nioExecutor) {
this.nioExecutor = nioExecutor;
Expand Down Expand Up @@ -275,4 +280,36 @@ public void setSslEngineConfigurator(SslEngineConfigurator configurator) {
public SslEngineConfigurator getSslEngineConfigurator() {
return sslEngineConfigurator;
}

/**
* Set the {@link ExecutorService} used for connection shutdown.
* If not set, falls back to the NIO executor and then the thread factory.
* This executor service is useful when strict control of the number of threads
* is necessary, the application can experience the closing of several connections
* at once, and automatic recovery is enabled. In such cases, the connection recovery
* can take place in the same pool of threads as the NIO operations, which can
* create deadlocks (all the threads of the pool are busy recovering, and there's no
* thread left for NIO, so connections never recover).
* <p>
* Note it's developer's responsibility to shut down the executor
* when it is no longer needed.
* <p>
* Using the thread factory for such scenarios avoid the deadlocks, at the price
* of potentially creating many short-lived threads in case of massive connection lost.
* <p>
* With both the NIO and connection shutdown executor services set and configured
* accordingly, the application can control reliably the number of threads used.
*
* @param connectionShutdownExecutor the executor service to use
* @return this {@link NioParams} instance
* @see NioParams#setNioExecutor(ExecutorService)
*/
public NioParams setConnectionShutdownExecutor(ExecutorService connectionShutdownExecutor) {
this.connectionShutdownExecutor = connectionShutdownExecutor;
return this;
}

public ExecutorService getConnectionShutdownExecutor() {
return connectionShutdownExecutor;
}
}
3 changes: 2 additions & 1 deletion src/test/java/com/rabbitmq/client/test/ClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
NoAutoRecoveryWhenTcpWindowIsFullTest.class,
JsonRpcTest.class,
AddressTest.class,
DefaultRetryHandlerTest.class
DefaultRetryHandlerTest.class,
NioDeadlockOnConnectionClosing.class
})
public class ClientTests {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.test;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.nio.NioParams;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
import static org.junit.Assert.assertTrue;

/**
*
*/
public class NioDeadlockOnConnectionClosing {

static final Logger LOGGER = LoggerFactory.getLogger(NioDeadlockOnConnectionClosing.class);

ExecutorService nioExecutorService, connectionShutdownExecutorService;
ConnectionFactory cf;
List<Connection> connections;

@Before
public void setUp() {
nioExecutorService = Executors.newFixedThreadPool(2);
connectionShutdownExecutorService = Executors.newFixedThreadPool(2);
cf = TestUtils.connectionFactory();
cf.setAutomaticRecoveryEnabled(true);
cf.useNio();
cf.setNetworkRecoveryInterval(1000);
NioParams params = new NioParams()
.setNioExecutor(nioExecutorService)
.setConnectionShutdownExecutor(connectionShutdownExecutorService)
.setNbIoThreads(2);
cf.setNioParams(params);
connections = new ArrayList<>();
}

@After
public void tearDown() throws Exception {
for (Connection connection : connections) {
try {
connection.close(2000);
} catch (Exception e) {
LOGGER.warn("Error while closing test connection", e);
}
}

shutdownExecutorService(nioExecutorService);
shutdownExecutorService(connectionShutdownExecutorService);
}

private void shutdownExecutorService(ExecutorService executorService) throws InterruptedException {
if (executorService == null) {
return;
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS);
if (!terminated) {
LOGGER.warn("Couldn't terminate executor after 5 seconds");
}
}

@Test
public void connectionClosing() throws Exception {
for (int i = 0; i < 10; i++) {
connections.add(cf.newConnection());
}
closeAllConnectionsAndWaitForRecovery(connections);
for (Connection connection : connections) {
assertTrue(connection.isOpen());
}
}
}
40 changes: 26 additions & 14 deletions src/test/java/com/rabbitmq/client/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -156,26 +158,36 @@ public static void closeAndWaitForRecovery(RecoverableConnection connection) thr
wait(latch);
}

public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException {
CountDownLatch latch = prepareForRecovery(connection);
public static void closeAllConnectionsAndWaitForRecovery(Collection<Connection> connections) throws IOException, InterruptedException {
CountDownLatch latch = prepareForRecovery(connections);
Host.closeAllConnections();
wait(latch);
}

public static CountDownLatch prepareForRecovery(Connection conn) {
final CountDownLatch latch = new CountDownLatch(1);
((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() {
public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException {
closeAllConnectionsAndWaitForRecovery(Collections.singletonList(connection));
}

@Override
public void handleRecovery(Recoverable recoverable) {
latch.countDown();
}
public static CountDownLatch prepareForRecovery(Connection connection) {
return prepareForRecovery(Collections.singletonList(connection));
}

@Override
public void handleRecoveryStarted(Recoverable recoverable) {
// No-op
}
});
public static CountDownLatch prepareForRecovery(Collection<Connection> connections) {
final CountDownLatch latch = new CountDownLatch(connections.size());
for (Connection conn : connections) {
((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() {

@Override
public void handleRecovery(Recoverable recoverable) {
latch.countDown();
}

@Override
public void handleRecoveryStarted(Recoverable recoverable) {
// No-op
}
});
}
return latch;
}

Expand Down