Skip to content

Commit d11e365

Browse files
Merge pull request #392 from rabbitmq/rabbitmq-java-client-380-nio-connection-closing
Add dedicated executor to close connection in NIO mode
2 parents 2b8d257 + 0a7e7e5 commit d11e365

File tree

5 files changed

+176
-29
lines changed

5 files changed

+176
-29
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ public class NioLoop implements Runnable {
4141

4242
private final NioParams nioParams;
4343

44+
private final ExecutorService connectionShutdownExecutor;
45+
4446
public NioLoop(NioParams nioParams, NioLoopContext loopContext) {
4547
this.nioParams = nioParams;
4648
this.context = loopContext;
49+
this.connectionShutdownExecutor = nioParams.getConnectionShutdownExecutor();
4750
}
4851

4952
@Override
@@ -283,19 +286,15 @@ protected void dispatchIoErrorToConnection(final SocketChannelFrameHandlerState
283286
}
284287

285288
protected void dispatchShutdownToConnection(final SocketChannelFrameHandlerState state) {
286-
Runnable shutdown = new Runnable() {
287-
288-
@Override
289-
public void run() {
290-
state.getConnection().doFinalShutdown();
291-
}
292-
};
293-
if (executorService() == null) {
289+
Runnable shutdown = () -> state.getConnection().doFinalShutdown();
290+
if (this.connectionShutdownExecutor != null) {
291+
connectionShutdownExecutor.execute(shutdown);
292+
} else if (executorService() != null) {
293+
executorService().execute(shutdown);
294+
} else {
294295
String name = "rabbitmq-connection-shutdown-" + state.getConnection();
295296
Thread shutdownThread = Environment.newThread(threadFactory(), shutdown, name);
296297
shutdownThread.start();
297-
} else {
298-
executorService().submit(shutdown);
299298
}
300299
}
301300

src/main/java/com/rabbitmq/client/impl/nio/NioParams.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ public class NioParams {
5555
private SocketChannelConfigurator socketChannelConfigurator = new DefaultSocketChannelConfigurator();
5656

5757
/** the hook to configure the SSL engine before the connection is open */
58-
private SslEngineConfigurator sslEngineConfigurator = new SslEngineConfigurator() {
59-
@Override
60-
public void configure(SSLEngine sslEngine) throws IOException { }
61-
};
58+
private SslEngineConfigurator sslEngineConfigurator = sslEngine -> { };
59+
60+
/** the executor service used for connection shutdown */
61+
private ExecutorService connectionShutdownExecutor;
6262

6363
public NioParams() {
6464
}
@@ -72,6 +72,7 @@ public NioParams(NioParams nioParams) {
7272
setNioExecutor(nioParams.getNioExecutor());
7373
setThreadFactory(nioParams.getThreadFactory());
7474
setSslEngineConfigurator(nioParams.getSslEngineConfigurator());
75+
setConnectionShutdownExecutor(nioParams.getConnectionShutdownExecutor());
7576
}
7677

7778
public int getReadByteBufferSize() {
@@ -186,6 +187,9 @@ public ExecutorService getNioExecutor() {
186187
* number of requested IO threads, plus a few more, as it's also
187188
* used to dispatch the shutdown of connections.
188189
*
190+
* Connection shutdown can also be handled by a dedicated {@link ExecutorService},
191+
* see {@link #setConnectionShutdownExecutor(ExecutorService)}.
192+
*
189193
* It's developer's responsibility to shut down the executor
190194
* when it is no longer needed.
191195
*
@@ -195,6 +199,7 @@ public ExecutorService getNioExecutor() {
195199
* @return this {@link NioParams} instance
196200
* @see NioParams#setNbIoThreads(int)
197201
* @see NioParams#setThreadFactory(ThreadFactory)
202+
* @see NioParams#setConnectionShutdownExecutor(ExecutorService)
198203
*/
199204
public NioParams setNioExecutor(ExecutorService nioExecutor) {
200205
this.nioExecutor = nioExecutor;
@@ -275,4 +280,36 @@ public void setSslEngineConfigurator(SslEngineConfigurator configurator) {
275280
public SslEngineConfigurator getSslEngineConfigurator() {
276281
return sslEngineConfigurator;
277282
}
283+
284+
/**
285+
* Set the {@link ExecutorService} used for connection shutdown.
286+
* If not set, falls back to the NIO executor and then the thread factory.
287+
* This executor service is useful when strict control of the number of threads
288+
* is necessary, the application can experience the closing of several connections
289+
* at once, and automatic recovery is enabled. In such cases, the connection recovery
290+
* can take place in the same pool of threads as the NIO operations, which can
291+
* create deadlocks (all the threads of the pool are busy recovering, and there's no
292+
* thread left for NIO, so connections never recover).
293+
* <p>
294+
* Note it's developer's responsibility to shut down the executor
295+
* when it is no longer needed.
296+
* <p>
297+
* Using the thread factory for such scenarios avoid the deadlocks, at the price
298+
* of potentially creating many short-lived threads in case of massive connection lost.
299+
* <p>
300+
* With both the NIO and connection shutdown executor services set and configured
301+
* accordingly, the application can control reliably the number of threads used.
302+
*
303+
* @param connectionShutdownExecutor the executor service to use
304+
* @return this {@link NioParams} instance
305+
* @see NioParams#setNioExecutor(ExecutorService)
306+
*/
307+
public NioParams setConnectionShutdownExecutor(ExecutorService connectionShutdownExecutor) {
308+
this.connectionShutdownExecutor = connectionShutdownExecutor;
309+
return this;
310+
}
311+
312+
public ExecutorService getConnectionShutdownExecutor() {
313+
return connectionShutdownExecutor;
314+
}
278315
}

src/test/java/com/rabbitmq/client/test/ClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
NoAutoRecoveryWhenTcpWindowIsFullTest.class,
6464
JsonRpcTest.class,
6565
AddressTest.class,
66-
DefaultRetryHandlerTest.class
66+
DefaultRetryHandlerTest.class,
67+
NioDeadlockOnConnectionClosing.class
6768
})
6869
public class ClientTests {
6970

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client.test;
17+
18+
import com.rabbitmq.client.Connection;
19+
import com.rabbitmq.client.ConnectionFactory;
20+
import com.rabbitmq.client.impl.nio.NioParams;
21+
import org.junit.After;
22+
import org.junit.Before;
23+
import org.junit.Test;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
33+
import static com.rabbitmq.client.test.TestUtils.closeAllConnectionsAndWaitForRecovery;
34+
import static org.junit.Assert.assertTrue;
35+
36+
/**
37+
*
38+
*/
39+
public class NioDeadlockOnConnectionClosing {
40+
41+
static final Logger LOGGER = LoggerFactory.getLogger(NioDeadlockOnConnectionClosing.class);
42+
43+
ExecutorService nioExecutorService, connectionShutdownExecutorService;
44+
ConnectionFactory cf;
45+
List<Connection> connections;
46+
47+
@Before
48+
public void setUp() {
49+
nioExecutorService = Executors.newFixedThreadPool(2);
50+
connectionShutdownExecutorService = Executors.newFixedThreadPool(2);
51+
cf = TestUtils.connectionFactory();
52+
cf.setAutomaticRecoveryEnabled(true);
53+
cf.useNio();
54+
cf.setNetworkRecoveryInterval(1000);
55+
NioParams params = new NioParams()
56+
.setNioExecutor(nioExecutorService)
57+
.setConnectionShutdownExecutor(connectionShutdownExecutorService)
58+
.setNbIoThreads(2);
59+
cf.setNioParams(params);
60+
connections = new ArrayList<>();
61+
}
62+
63+
@After
64+
public void tearDown() throws Exception {
65+
for (Connection connection : connections) {
66+
try {
67+
connection.close(2000);
68+
} catch (Exception e) {
69+
LOGGER.warn("Error while closing test connection", e);
70+
}
71+
}
72+
73+
shutdownExecutorService(nioExecutorService);
74+
shutdownExecutorService(connectionShutdownExecutorService);
75+
}
76+
77+
private void shutdownExecutorService(ExecutorService executorService) throws InterruptedException {
78+
if (executorService == null) {
79+
return;
80+
}
81+
executorService.shutdown();
82+
boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS);
83+
if (!terminated) {
84+
LOGGER.warn("Couldn't terminate executor after 5 seconds");
85+
}
86+
}
87+
88+
@Test
89+
public void connectionClosing() throws Exception {
90+
for (int i = 0; i < 10; i++) {
91+
connections.add(cf.newConnection());
92+
}
93+
closeAllConnectionsAndWaitForRecovery(connections);
94+
for (Connection connection : connections) {
95+
assertTrue(connection.isOpen());
96+
}
97+
}
98+
}

src/test/java/com/rabbitmq/client/test/TestUtils.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.slf4j.LoggerFactory;
3232

3333
import java.io.IOException;
34+
import java.util.Collection;
35+
import java.util.Collections;
3436
import java.util.concurrent.Callable;
3537
import java.util.concurrent.CountDownLatch;
3638
import java.util.concurrent.TimeUnit;
@@ -156,26 +158,36 @@ public static void closeAndWaitForRecovery(RecoverableConnection connection) thr
156158
wait(latch);
157159
}
158160

159-
public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException {
160-
CountDownLatch latch = prepareForRecovery(connection);
161+
public static void closeAllConnectionsAndWaitForRecovery(Collection<Connection> connections) throws IOException, InterruptedException {
162+
CountDownLatch latch = prepareForRecovery(connections);
161163
Host.closeAllConnections();
162164
wait(latch);
163165
}
164166

165-
public static CountDownLatch prepareForRecovery(Connection conn) {
166-
final CountDownLatch latch = new CountDownLatch(1);
167-
((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() {
167+
public static void closeAllConnectionsAndWaitForRecovery(Connection connection) throws IOException, InterruptedException {
168+
closeAllConnectionsAndWaitForRecovery(Collections.singletonList(connection));
169+
}
168170

169-
@Override
170-
public void handleRecovery(Recoverable recoverable) {
171-
latch.countDown();
172-
}
171+
public static CountDownLatch prepareForRecovery(Connection connection) {
172+
return prepareForRecovery(Collections.singletonList(connection));
173+
}
173174

174-
@Override
175-
public void handleRecoveryStarted(Recoverable recoverable) {
176-
// No-op
177-
}
178-
});
175+
public static CountDownLatch prepareForRecovery(Collection<Connection> connections) {
176+
final CountDownLatch latch = new CountDownLatch(connections.size());
177+
for (Connection conn : connections) {
178+
((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() {
179+
180+
@Override
181+
public void handleRecovery(Recoverable recoverable) {
182+
latch.countDown();
183+
}
184+
185+
@Override
186+
public void handleRecoveryStarted(Recoverable recoverable) {
187+
// No-op
188+
}
189+
});
190+
}
179191
return latch;
180192
}
181193

0 commit comments

Comments
 (0)