Skip to content

Commit 0a7e7e5

Browse files
committed
Add dedicated executor to close connection in NIO mode
When sharing the same executor for NIO and connection closing, all the threads of the pool can be busy recovering connections, leaving no thread left for IO. This commit add a new executor service to the NIO mode to submit all the connection closing to. This is useful when an application maintains dozens or hundreds of connections and suffers massive connection lost. Hundreds of connection closing tasks can be submitted very quickly, so controlling the number of threads and leaving some threads available for IO is critical. If an application maintain just a few connections and can deal with the creation of a few threads, using the new executor isn't necessary. Fixes #380
1 parent 2b8d257 commit 0a7e7e5

File tree

5 files changed

+176
-29
lines changed

5 files changed

+176
-29
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ public class NioLoop implements Runnable {
4141

4242
private final NioParams nioParams;
4343

44+
private final ExecutorService connectionShutdownExecutor;
45+
4446
public NioLoop(NioParams nioParams, NioLoopContext loopContext) {
4547
this.nioParams = nioParams;
4648
this.context = loopContext;
49+
this.connectionShutdownExecutor = nioParams.getConnectionShutdownExecutor();
4750
}
4851

4952
@Override
@@ -283,19 +286,15 @@ protected void dispatchIoErrorToConnection(final SocketChannelFrameHandlerState
283286
}
284287

285288
protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) {
286-
Runnable shutdown = new Runnable() {
287-
288-
@Override
289-
public void run() {
290-
state.getConnection().doFinalShutdown();
291-
}
292-
};
293-
if (executorService() == null) {
289+
Runnable shutdown = () -> state.getConnection().doFinalShutdown();
290+
if (this.connectionShutdownExecutor != null) {
291+
connectionShutdownExecutor.execute(shutdown);
292+
} else if (executorService() != null) {
293+
executorService().execute(shutdown);
294+
} else {
294295
String name = "rabbitmq-connection-shutdown-" + state.getConnection();
295296
Thread shutdownThread = Environment.newThread(threadFactory(), shutdown, name);
296297
shutdownThread.start();
297-
} else {
298-
executorService().submit(shutdown);
299298
}
300299
}
301300

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ public class NioParams {
5555
private SocketChannelConfigurator socketChannelConfigurator = new DefaultSocketChannelConfigurator();
5656

5757
/** the hook to configure the SSL engine before the connection is open */
58-
private SslEngineConfigurator sslEngineConfigurator = new SslEngineConfigurator() {
59-
@Override
60-
public void configure(SSLEngine sslEngine) throws IOException { }
61-
};
58+
private SslEngineConfigurator sslEngineConfigurator = sslEngine -> { };
59+
60+
/** the executor service used for connection shutdown */
61+
private ExecutorService connectionShutdownExecutor;
6262

6363
public NioParams() {
6464
}
@@ -72,6 +72,7 @@ public NioParams(NioParams nioParams) {
7272
setNioExecutor(nioParams.getNioExecutor());
7373
setThreadFactory(nioParams.getThreadFactory());
7474
setSslEngineConfigurator(nioParams.getSslEngineConfigurator());
75+
setConnectionShutdownExecutor(nioParams.getConnectionShutdownExecutor());
7576
}
7677

7778
public int getReadByteBufferSize() {
@@ -186,6 +187,9 @@ public ExecutorService getNioExecutor() {
186187
* number of requested IO threads, plus a few more, as it's also
187188
* used to dispatch the shutdown of connections.
188189
*
190+
* Connection shutdown can also be handled by a dedicated {@link ExecutorService},
191+
* see {@link #setConnectionShutdownExecutor(ExecutorService)}.
192+
*
189193
* It's developer's responsibility to shut down the executor
190194
* when it is no longer needed.
191195
*
@@ -195,6 +199,7 @@ public ExecutorService getNioExecutor() {
195199
* @return this {@link NioParams} instance
196200
* @see NioParams#setNbIoThreads(int)
197201
* @see NioParams#setThreadFactory(ThreadFactory)
202+
* @see NioParams#setConnectionShutdownExecutor(ExecutorService)
198203
*/
199204
public NioParams setNioExecutor(ExecutorService nioExecutor) {
200205
this.nioExecutor = nioExecutor;
@@ -275,4 +280,36 @@ public void setSslEngineConfigurator(SslEngineConfigurator configurator) {
275280
public SslEngineConfigurator getSslEngineConfigurator() {
276281
return sslEngineConfigurator;
277282
}
283+
284+
/**
285+
* Set the {@link ExecutorService} used for connection shutdown.
286+
* If not set, falls back to the NIO executor and then the thread factory.
287+
* This executor service is useful when strict control of the number of threads
288+
* is necessary, the application can experience the closing of several connections
289+
* at once, and automatic recovery is enabled. In such cases, the connection recovery
290+
* can take place in the same pool of threads as the NIO operations, which can
291+
* create deadlocks (all the threads of the pool are busy recovering, and there's no
292+
* thread left for NIO, so connections never recover).
293+
* <p>
294+
* Note it's developer's responsibility to shut down the executor
295+
* when it is no longer needed.
296+
* <p>
297+
* Using the thread factory for such scenarios avoid the deadlocks, at the price
298+
* of potentially creating many short-lived threads in case of massive connection lost.
299+
* <p>
300+
* With both the NIO and connection shutdown executor services set and configured
301+
* accordingly, the application can control reliably the number of threads used.
302+
*
303+
* @param connectionShutdownExecutor the executor service to use
304+
* @return this {@link NioParams} instance
305+
* @see NioParams#setNioExecutor(ExecutorService)
306+
*/
307+
public NioParams setConnectionShutdownExecutor(ExecutorService connectionShutdownExecutor) {
308+
this.connectionShutdownExecutor = connectionShutdownExecutor;
309+
return this;
310+
}
311+
312+
public ExecutorService getConnectionShutdownExecutor() {
313+
return connectionShutdownExecutor;
314+
}
278315
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
NoAutoRecoveryWhenTcpWindowIsFullTest.class,
6464
JsonRpcTest.class,
6565
AddressTest.class,
66-
DefaultRetryHandlerTest.class
66+
DefaultRetryHandlerTest.class,
67+
NioDeadlockOnConnectionClosing.class
6768
})
6869
public class ClientTests {
6970

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Connection;
19+
import com.rabbitmq.client.ConnectionFactory;
20+
import com.rabbitmq.client.impl.nio.NioParams;
21+
import org.junit.After;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
33+
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
34+
import static org.junit.Assert.assertTrue;
35+
36+
/**
37+
*
38+
*/
39+
public class NioDeadlockOnConnectionClosing {
40+
41+
static final Logger LOGGER = LoggerFactory.getLogger(NioDeadlockOnConnectionClosing.class);
42+
43+
ExecutorService nioExecutorService, connectionShutdownExecutorService;
44+
ConnectionFactory cf;
45+
List<Connection> connections;
46+
47+
@Before
48+
public void setUp() {
49+
nioExecutorService = Executors.newFixedThreadPool(2);
50+
connectionShutdownExecutorService = Executors.newFixedThreadPool(2);
51+
cf = TestUtils.connectionFactory();
52+
cf.setAutomaticRecoveryEnabled(true);
53+
cf.useNio();
54+
cf.setNetworkRecoveryInterval(1000);
55+
NioParams params = new NioParams()
56+
.setNioExecutor(nioExecutorService)
57+
.setConnectionShutdownExecutor(connectionShutdownExecutorService)
58+
.setNbIoThreads(2);
59+
cf.setNioParams(params);
60+
connections = new ArrayList<>();
61+
}
62+
63+
@After
64+
public void tearDown() throws Exception {
65+
for (Connection connection : connections) {
66+
try {
67+
connection.close(2000);
68+
} catch (Exception e) {
69+
LOGGER.warn("Error while closing test connection", e);
70+
}
71+
}
72+
73+
shutdownExecutorService(nioExecutorService);
74+
shutdownExecutorService(connectionShutdownExecutorService);
75+
}
76+
77+
private void shutdownExecutorService(ExecutorService executorService) throws InterruptedException {
78+
if (executorService == null) {
79+
return;
80+
}
81+
executorService.shutdown();
82+
boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS);
83+
if (!terminated) {
84+
LOGGER.warn("Couldn't terminate executor after 5 seconds");
85+
}
86+
}
87+
88+
@Test
89+
public void connectionClosing() throws Exception {
90+
for (int i = 0; i < 10; i++) {
91+
connections.add(cf.newConnection());
92+
}
93+
closeAllConnectionsAndWaitForRecovery(connections);
94+
for (Connection connection : connections) {
95+
assertTrue(connection.isOpen());
96+
}
97+
}
98+
}

src/test/java/com/rabbitmq/client/test/TestUtils.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.slf4j.LoggerFactory;
3232

3333
import java.io.IOException;
34+
import java.util.Collection;
35+
import java.util.Collections;
3436
import java.util.concurrent.Callable;
3537
import java.util.concurrent.CountDownLatch;
3638
import java.util.concurrent.TimeUnit;
@@ -156,26 +158,36 @@ public static void closeAndWaitForRecovery(RecoverableConnection connection) thr
156158
wait(latch);
157159
}
158160

159-
public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException {
160-
CountDownLatch latch = prepareForRecovery(connection);
161+
public static void closeAllConnectionsAndWaitForRecovery(Collection<Connection> connections) throws IOException, InterruptedException {
162+
CountDownLatch latch = prepareForRecovery(connections);
161163
Host.closeAllConnections();
162164
wait(latch);
163165
}
164166

165-
public static CountDownLatch prepareForRecovery(Connection conn) {
166-
final CountDownLatch latch = new CountDownLatch(1);
167-
((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() {
167+
public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException {
168+
closeAllConnectionsAndWaitForRecovery(Collections.singletonList(connection));
169+
}
168170

169-
@Override
170-
public void handleRecovery(Recoverable recoverable) {
171-
latch.countDown();
172-
}
171+
public static CountDownLatch prepareForRecovery(Connection connection) {
172+
return prepareForRecovery(Collections.singletonList(connection));
173+
}
173174

174-
@Override
175-
public void handleRecoveryStarted(Recoverable recoverable) {
176-
// No-op
177-
}
178-
});
175+
public static CountDownLatch prepareForRecovery(Collection<Connection> connections) {
176+
final CountDownLatch latch = new CountDownLatch(connections.size());
177+
for (Connection conn : connections) {
178+
((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() {
179+
180+
@Override
181+
public void handleRecovery(Recoverable recoverable) {
182+
latch.countDown();
183+
}
184+
185+
@Override
186+
public void handleRecoveryStarted(Recoverable recoverable) {
187+
// No-op
188+
}
189+
});
190+
}
179191
return latch;
180192
}
181193

0 commit comments

Comments
 (0)