From 0a7e7e563db93e9e886a3bfd3c0e3cf7840e6a74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 14 Aug 2018 15:25:10 +0200 Subject: [PATCH] Add dedicated executor to close connection in NIO mode When sharing the same executor for NIO and connection closing, all the threads of the pool can be busy recovering connections, leaving no thread left for IO. This commit add a new executor service to the NIO mode to submit all the connection closing to. This is useful when an application maintains dozens or hundreds of connections and suffers massive connection lost. Hundreds of connection closing tasks can be submitted very quickly, so controlling the number of threads and leaving some threads available for IO is critical. If an application maintain just a few connections and can deal with the creation of a few threads, using the new executor isn't necessary. Fixes #380 --- .../com/rabbitmq/client/impl/nio/NioLoop.java | 19 ++-- .../rabbitmq/client/impl/nio/NioParams.java | 45 ++++++++- .../com/rabbitmq/client/test/ClientTests.java | 3 +- .../test/NioDeadlockOnConnectionClosing.java | 98 +++++++++++++++++++ .../com/rabbitmq/client/test/TestUtils.java | 40 +++++--- 5 files changed, 176 insertions(+), 29 deletions(-) create mode 100644 src/test/java/com/rabbitmq/client/test/NioDeadlockOnConnectionClosing.java diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java b/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java index d4ce97dde6..6e1c1f6352 100644 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java +++ b/src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java @@ -41,9 +41,12 @@ public class NioLoop implements Runnable { private final NioParams nioParams; + private final ExecutorService connectionShutdownExecutor; + public NioLoop(NioParams nioParams, NioLoopContext loopContext) { this.nioParams = nioParams; this.context = loopContext; + this.connectionShutdownExecutor = nioParams.getConnectionShutdownExecutor(); } @Override @@ -283,19 +286,15 @@ protected void dispatchIoErrorToConnection(final SocketChannelFrameHandlerState } protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) { - Runnable shutdown = new Runnable() { - - @Override - public void run() { - state.getConnection().doFinalShutdown(); - } - }; - if (executorService() == null) { + Runnable shutdown = () -> state.getConnection().doFinalShutdown(); + if (this.connectionShutdownExecutor != null) { + connectionShutdownExecutor.execute(shutdown); + } else if (executorService() != null) { + executorService().execute(shutdown); + } else { String name = "rabbitmq-connection-shutdown-" + state.getConnection(); Thread shutdownThread = Environment.newThread(threadFactory(), shutdown, name); shutdownThread.start(); - } else { - executorService().submit(shutdown); } } diff --git a/src/main/java/com/rabbitmq/client/impl/nio/NioParams.java b/src/main/java/com/rabbitmq/client/impl/nio/NioParams.java index 0dbe627808..6652a7843f 100644 --- a/src/main/java/com/rabbitmq/client/impl/nio/NioParams.java +++ b/src/main/java/com/rabbitmq/client/impl/nio/NioParams.java @@ -55,10 +55,10 @@ public class NioParams { private SocketChannelConfigurator socketChannelConfigurator = new DefaultSocketChannelConfigurator(); /** the hook to configure the SSL engine before the connection is open */ - private SslEngineConfigurator sslEngineConfigurator = new SslEngineConfigurator() { - @Override - public void configure(SSLEngine sslEngine) throws IOException { } - }; + private SslEngineConfigurator sslEngineConfigurator = sslEngine -> { }; + + /** the executor service used for connection shutdown */ + private ExecutorService connectionShutdownExecutor; public NioParams() { } @@ -72,6 +72,7 @@ public NioParams(NioParams nioParams) { setNioExecutor(nioParams.getNioExecutor()); setThreadFactory(nioParams.getThreadFactory()); setSslEngineConfigurator(nioParams.getSslEngineConfigurator()); + setConnectionShutdownExecutor(nioParams.getConnectionShutdownExecutor()); } public int getReadByteBufferSize() { @@ -186,6 +187,9 @@ public ExecutorService getNioExecutor() { * number of requested IO threads, plus a few more, as it's also * used to dispatch the shutdown of connections. * + * Connection shutdown can also be handled by a dedicated {@link ExecutorService}, + * see {@link #setConnectionShutdownExecutor(ExecutorService)}. + * * It's developer's responsibility to shut down the executor * when it is no longer needed. * @@ -195,6 +199,7 @@ public ExecutorService getNioExecutor() { * @return this {@link NioParams} instance * @see NioParams#setNbIoThreads(int) * @see NioParams#setThreadFactory(ThreadFactory) + * @see NioParams#setConnectionShutdownExecutor(ExecutorService) */ public NioParams setNioExecutor(ExecutorService nioExecutor) { this.nioExecutor = nioExecutor; @@ -275,4 +280,36 @@ public void setSslEngineConfigurator(SslEngineConfigurator configurator) { public SslEngineConfigurator getSslEngineConfigurator() { return sslEngineConfigurator; } + + /** + * Set the {@link ExecutorService} used for connection shutdown. + * If not set, falls back to the NIO executor and then the thread factory. + * This executor service is useful when strict control of the number of threads + * is necessary, the application can experience the closing of several connections + * at once, and automatic recovery is enabled. In such cases, the connection recovery + * can take place in the same pool of threads as the NIO operations, which can + * create deadlocks (all the threads of the pool are busy recovering, and there's no + * thread left for NIO, so connections never recover). + *

+ * Note it's developer's responsibility to shut down the executor + * when it is no longer needed. + *

+ * Using the thread factory for such scenarios avoid the deadlocks, at the price + * of potentially creating many short-lived threads in case of massive connection lost. + *

+ * With both the NIO and connection shutdown executor services set and configured + * accordingly, the application can control reliably the number of threads used. + * + * @param connectionShutdownExecutor the executor service to use + * @return this {@link NioParams} instance + * @see NioParams#setNioExecutor(ExecutorService) + */ + public NioParams setConnectionShutdownExecutor(ExecutorService connectionShutdownExecutor) { + this.connectionShutdownExecutor = connectionShutdownExecutor; + return this; + } + + public ExecutorService getConnectionShutdownExecutor() { + return connectionShutdownExecutor; + } } diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java index 9c2994b671..2b4c8ea04a 100644 --- a/src/test/java/com/rabbitmq/client/test/ClientTests.java +++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java @@ -63,7 +63,8 @@ NoAutoRecoveryWhenTcpWindowIsFullTest.class, JsonRpcTest.class, AddressTest.class, - DefaultRetryHandlerTest.class + DefaultRetryHandlerTest.class, + NioDeadlockOnConnectionClosing.class }) public class ClientTests { diff --git a/src/test/java/com/rabbitmq/client/test/NioDeadlockOnConnectionClosing.java b/src/test/java/com/rabbitmq/client/test/NioDeadlockOnConnectionClosing.java new file mode 100644 index 0000000000..78082344b1 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/NioDeadlockOnConnectionClosing.java @@ -0,0 +1,98 @@ +// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.impl.nio.NioParams; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class NioDeadlockOnConnectionClosing { + + static final Logger LOGGER = LoggerFactory.getLogger(NioDeadlockOnConnectionClosing.class); + + ExecutorService nioExecutorService, connectionShutdownExecutorService; + ConnectionFactory cf; + List connections; + + @Before + public void setUp() { + nioExecutorService = Executors.newFixedThreadPool(2); + connectionShutdownExecutorService = Executors.newFixedThreadPool(2); + cf = TestUtils.connectionFactory(); + cf.setAutomaticRecoveryEnabled(true); + cf.useNio(); + cf.setNetworkRecoveryInterval(1000); + NioParams params = new NioParams() + .setNioExecutor(nioExecutorService) + .setConnectionShutdownExecutor(connectionShutdownExecutorService) + .setNbIoThreads(2); + cf.setNioParams(params); + connections = new ArrayList<>(); + } + + @After + public void tearDown() throws Exception { + for (Connection connection : connections) { + try { + connection.close(2000); + } catch (Exception e) { + LOGGER.warn("Error while closing test connection", e); + } + } + + shutdownExecutorService(nioExecutorService); + shutdownExecutorService(connectionShutdownExecutorService); + } + + private void shutdownExecutorService(ExecutorService executorService) throws InterruptedException { + if (executorService == null) { + return; + } + executorService.shutdown(); + boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS); + if (!terminated) { + LOGGER.warn("Couldn't terminate executor after 5 seconds"); + } + } + + @Test + public void connectionClosing() throws Exception { + for (int i = 0; i < 10; i++) { + connections.add(cf.newConnection()); + } + closeAllConnectionsAndWaitForRecovery(connections); + for (Connection connection : connections) { + assertTrue(connection.isOpen()); + } + } +} diff --git a/src/test/java/com/rabbitmq/client/test/TestUtils.java b/src/test/java/com/rabbitmq/client/test/TestUtils.java index c44e8b26a4..5b0b7d0d7e 100644 --- a/src/test/java/com/rabbitmq/client/test/TestUtils.java +++ b/src/test/java/com/rabbitmq/client/test/TestUtils.java @@ -31,6 +31,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -156,26 +158,36 @@ public static void closeAndWaitForRecovery(RecoverableConnection connection) thr wait(latch); } - public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException { - CountDownLatch latch = prepareForRecovery(connection); + public static void closeAllConnectionsAndWaitForRecovery(Collection connections) throws IOException, InterruptedException { + CountDownLatch latch = prepareForRecovery(connections); Host.closeAllConnections(); wait(latch); } - public static CountDownLatch prepareForRecovery(Connection conn) { - final CountDownLatch latch = new CountDownLatch(1); - ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { + public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException { + closeAllConnectionsAndWaitForRecovery(Collections.singletonList(connection)); + } - @Override - public void handleRecovery(Recoverable recoverable) { - latch.countDown(); - } + public static CountDownLatch prepareForRecovery(Connection connection) { + return prepareForRecovery(Collections.singletonList(connection)); + } - @Override - public void handleRecoveryStarted(Recoverable recoverable) { - // No-op - } - }); + public static CountDownLatch prepareForRecovery(Collection connections) { + final CountDownLatch latch = new CountDownLatch(connections.size()); + for (Connection conn : connections) { + ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { + + @Override + public void handleRecovery(Recoverable recoverable) { + latch.countDown(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + // No-op + } + }); + } return latch; }