From 38294a7f9a6dfef5fc8e61b5efd4dee7f724c61a Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 25 Apr 2025 11:03:00 -0700 Subject: [PATCH 01/13] Add connection timeout handler to TlsChannelImpl. JAVA-5856 --- .../TlsChannelStreamFactoryFactory.java | 134 ++++++++++++------ .../async/AsynchronousTlsChannelGroup.java | 9 ++ 2 files changed, 103 insertions(+), 40 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index daf0d8cecdd..48aa1355175 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -40,6 +40,7 @@ import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; +import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -49,7 +50,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.internal.connection.ServerAddressHelper.getSocketAddresses; @@ -99,19 +102,39 @@ public void close() { private static class SelectorMonitor implements Closeable { - private static final class Pair { + static final class SocketRegistration { private final SocketChannel socketChannel; private final Runnable attachment; + private final AtomicReference connectionRegistrationState; - private Pair(final SocketChannel socketChannel, final Runnable attachment) { + enum ConnectionRegistrationState { + CONNECTING, + CONNECTED, + TIMEOUT_OUT + } + + private SocketRegistration(final SocketChannel socketChannel, final Runnable attachment) { this.socketChannel = socketChannel; this.attachment = attachment; + this.connectionRegistrationState = new AtomicReference<>(ConnectionRegistrationState.CONNECTING); + } + + public boolean markConnectionEstablishmentTimedOut() { + return connectionRegistrationState.compareAndSet( + ConnectionRegistrationState.CONNECTING, + ConnectionRegistrationState.TIMEOUT_OUT); + } + + public boolean markConnectionEstablishmentCompleted() { + return connectionRegistrationState.compareAndSet( + ConnectionRegistrationState.CONNECTING, + ConnectionRegistrationState.CONNECTED); } } private final Selector selector; private volatile boolean isClosed; - private final ConcurrentLinkedDeque pendingRegistrations = new ConcurrentLinkedDeque<>(); + private final ConcurrentLinkedDeque pendingRegistrations = new ConcurrentLinkedDeque<>(); SelectorMonitor() { try { @@ -121,23 +144,29 @@ private Pair(final SocketChannel socketChannel, final Runnable attachment) { } } + // Monitors OP_CONNECT events. void start() { Thread selectorThread = new Thread(() -> { try { while (!isClosed) { try { selector.select(); - for (SelectionKey selectionKey : selector.selectedKeys()) { selectionKey.cancel(); - Runnable runnable = (Runnable) selectionKey.attachment(); - runnable.run(); + SocketRegistration socketRegistration = (SocketRegistration) selectionKey.attachment(); + + boolean markedCompleted = socketRegistration.markConnectionEstablishmentCompleted(); + if (markedCompleted) { + Runnable runnable = socketRegistration.attachment; + runnable.run(); + } else { + assertFalse(socketRegistration.socketChannel.isOpen()); + } } - for (Iterator iter = pendingRegistrations.iterator(); iter.hasNext();) { - Pair pendingRegistration = iter.next(); - pendingRegistration.socketChannel.register(selector, SelectionKey.OP_CONNECT, - pendingRegistration.attachment); + for (Iterator iter = pendingRegistrations.iterator(); iter.hasNext();) { + SocketRegistration pendingRegistration = iter.next(); + pendingRegistration.socketChannel.register(selector, SelectionKey.OP_CONNECT, pendingRegistration); iter.remove(); } } catch (Exception e) { @@ -156,8 +185,9 @@ void start() { selectorThread.start(); } - void register(final SocketChannel channel, final Runnable attachment) { - pendingRegistrations.add(new Pair(channel, attachment)); + + void register(final SocketRegistration registration) { + pendingRegistrations.add(registration); selector.wakeup(); } @@ -203,41 +233,65 @@ public void openAsync(final OperationContext operationContext, final AsyncComple socketChannel.connect(getSocketAddresses(getServerAddress(), inetAddressResolver).get(0)); - selectorMonitor.register(socketChannel, () -> { - try { - if (!socketChannel.finishConnect()) { - throw new MongoSocketOpenException("Failed to finish connect", getServerAddress()); - } + SelectorMonitor.SocketRegistration socketRegistration = new SelectorMonitor.SocketRegistration( + socketChannel, () -> initializeTslChannel(handler, socketChannel)); - SSLEngine sslEngine = getSslContext().createSSLEngine(getServerAddress().getHost(), - getServerAddress().getPort()); - sslEngine.setUseClientMode(true); + int connectTimeoutMs = getSettings().getConnectTimeout(TimeUnit.MILLISECONDS); - SSLParameters sslParameters = sslEngine.getSSLParameters(); - enableSni(getServerAddress().getHost(), sslParameters); + group.getTimeoutExecutor().schedule(() -> { + boolean markedTimedOut = socketRegistration.markConnectionEstablishmentTimedOut(); + if (markedTimedOut) { + closeAndTimeout(handler, socketChannel); + } + }, connectTimeoutMs, TimeUnit.MILLISECONDS); - if (!sslSettings.isInvalidHostNameAllowed()) { - enableHostNameVerification(sslParameters); - } - sslEngine.setSSLParameters(sslParameters); + selectorMonitor.register(socketRegistration); + } catch (IOException e) { + handler.failed(new MongoSocketOpenException("Exception opening socket", getServerAddress(), e)); + } catch (Throwable t) { + handler.failed(t); + } + } - BufferAllocator bufferAllocator = new BufferProviderAllocator(); + private void closeAndTimeout(final AsyncCompletionHandler handler, final SocketChannel socketChannel) { + InterruptedByTimeoutException interruptedByTimeoutException = new InterruptedByTimeoutException(); + try { + socketChannel.close(); + } catch (Exception e) { + interruptedByTimeoutException.addSuppressed(e); + } + handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), new InterruptedByTimeoutException())); + } - TlsChannel tlsChannel = ClientTlsChannel.newBuilder(socketChannel, sslEngine) - .withEncryptedBufferAllocator(bufferAllocator) - .withPlainBufferAllocator(bufferAllocator) - .build(); + private void initializeTslChannel(final AsyncCompletionHandler handler, final SocketChannel socketChannel) { + try { + if (!socketChannel.finishConnect()) { + throw new MongoSocketOpenException("Failed to finish connect", getServerAddress()); + } - // build asynchronous channel, based in the TLS channel and associated with the global group. - setChannel(new AsynchronousTlsChannelAdapter(new AsynchronousTlsChannel(group, tlsChannel, socketChannel))); + SSLEngine sslEngine = getSslContext().createSSLEngine(getServerAddress().getHost(), + getServerAddress().getPort()); + sslEngine.setUseClientMode(true); - handler.completed(null); - } catch (IOException e) { - handler.failed(new MongoSocketOpenException("Exception opening socket", getServerAddress(), e)); - } catch (Throwable t) { - handler.failed(t); - } - }); + SSLParameters sslParameters = sslEngine.getSSLParameters(); + enableSni(getServerAddress().getHost(), sslParameters); + + if (!sslSettings.isInvalidHostNameAllowed()) { + enableHostNameVerification(sslParameters); + } + sslEngine.setSSLParameters(sslParameters); + + BufferAllocator bufferAllocator = new BufferProviderAllocator(); + + TlsChannel tlsChannel = ClientTlsChannel.newBuilder(socketChannel, sslEngine) + .withEncryptedBufferAllocator(bufferAllocator) + .withPlainBufferAllocator(bufferAllocator) + .build(); + + // build asynchronous channel, based in the TLS channel and associated with the global group. + setChannel(new AsynchronousTlsChannelAdapter(new AsynchronousTlsChannel(group, tlsChannel, socketChannel))); + + handler.completed(null); } catch (IOException e) { handler.failed(new MongoSocketOpenException("Exception opening socket", getServerAddress(), e)); } catch (Throwable t) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java index 57db0df66e8..d9b1420a6e3 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java @@ -823,4 +823,13 @@ public long getCurrentWriteCount() { public long getCurrentRegistrationCount() { return registrations.mappingCount(); } + + /** + * Returns the timeout executor used by this channel group. + * + * @return the timeout executor + */ + public ScheduledThreadPoolExecutor getTimeoutExecutor() { + return timeoutExecutor; + } } From 5eef0fc161c11b6902e28a5a2eeb05a5ef6f1c94 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 25 Apr 2025 11:37:19 -0700 Subject: [PATCH 02/13] Add close method. JAVA-5856 --- .../TlsChannelStreamFactoryFactory.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index 48aa1355175..7e666504762 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -254,13 +254,23 @@ public void openAsync(final OperationContext operationContext, final AsyncComple } private void closeAndTimeout(final AsyncCompletionHandler handler, final SocketChannel socketChannel) { - InterruptedByTimeoutException interruptedByTimeoutException = new InterruptedByTimeoutException(); + // We check if this stream was closed before timeout exception. + boolean streamClosed = isClosed(); + + //TODO refactor ths draft + InterruptedByTimeoutException timeoutException = new InterruptedByTimeoutException(); try { socketChannel.close(); } catch (Exception e) { - interruptedByTimeoutException.addSuppressed(e); + //TODO should ignore this exception? We seem to do so in other places + timeoutException.addSuppressed(e); + } + + if (streamClosed) { + handler.completed(null); + } else { + handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), new InterruptedByTimeoutException())); } - handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), new InterruptedByTimeoutException())); } private void initializeTslChannel(final AsyncCompletionHandler handler, final SocketChannel socketChannel) { From 6b838af9b23e52624bb77802209f5b0dd0be245c Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 25 Apr 2025 12:27:41 -0700 Subject: [PATCH 03/13] Fix an issue with infinite timeout setting. JAVA-5856 --- .../TlsChannelStreamFactoryFactory.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index 7e666504762..cf55bcb3b8f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -237,14 +237,9 @@ public void openAsync(final OperationContext operationContext, final AsyncComple socketChannel, () -> initializeTslChannel(handler, socketChannel)); int connectTimeoutMs = getSettings().getConnectTimeout(TimeUnit.MILLISECONDS); - - group.getTimeoutExecutor().schedule(() -> { - boolean markedTimedOut = socketRegistration.markConnectionEstablishmentTimedOut(); - if (markedTimedOut) { - closeAndTimeout(handler, socketChannel); - } - }, connectTimeoutMs, TimeUnit.MILLISECONDS); - + if (connectTimeoutMs > 0) { + scheduleTimeoutInterruption(handler, socketRegistration, socketChannel, connectTimeoutMs); + } selectorMonitor.register(socketRegistration); } catch (IOException e) { handler.failed(new MongoSocketOpenException("Exception opening socket", getServerAddress(), e)); @@ -253,6 +248,18 @@ public void openAsync(final OperationContext operationContext, final AsyncComple } } + private void scheduleTimeoutInterruption(final AsyncCompletionHandler handler, + final SelectorMonitor.SocketRegistration socketRegistration, + final SocketChannel socketChannel, + final int connectTimeoutMs) { + group.getTimeoutExecutor().schedule(() -> { + boolean markedTimedOut = socketRegistration.markConnectionEstablishmentTimedOut(); + if (markedTimedOut) { + closeAndTimeout(handler, socketChannel); + } + }, connectTimeoutMs, TimeUnit.MILLISECONDS); + } + private void closeAndTimeout(final AsyncCompletionHandler handler, final SocketChannel socketChannel) { // We check if this stream was closed before timeout exception. boolean streamClosed = isClosed(); From 238268c2857cade2267de8d376bf879bf2c13178 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 25 Apr 2025 15:34:29 -0700 Subject: [PATCH 04/13] Removed racy assert. JAVA-5856 --- .../internal/connection/TlsChannelStreamFactoryFactory.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index cf55bcb3b8f..58d0f6b7690 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -159,8 +159,6 @@ void start() { if (markedCompleted) { Runnable runnable = socketRegistration.attachment; runnable.run(); - } else { - assertFalse(socketRegistration.socketChannel.isOpen()); } } From 4a932eb72a3719c15405c2c5e157160ba26c39e6 Mon Sep 17 00:00:00 2001 From: Viacheslav Babanin Date: Fri, 25 Apr 2025 19:00:06 -0700 Subject: [PATCH 05/13] Update driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java Co-authored-by: Valentin Kovalenko --- .../internal/connection/TlsChannelStreamFactoryFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index 58d0f6b7690..bb58d00826c 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -274,7 +274,7 @@ private void closeAndTimeout(final AsyncCompletionHandler handler, final S if (streamClosed) { handler.completed(null); } else { - handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), new InterruptedByTimeoutException())); + handler.failed(new MongoSocketOpenException("Exception opening socket", getAddress(), timeoutException)); } } From 839e51d4af58e3b589d3205af48e52be6bc37588 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 25 Apr 2025 19:04:26 -0700 Subject: [PATCH 06/13] Remove ConnectionRegistrationState. JAVA-5856 --- .../TlsChannelStreamFactoryFactory.java | 58 ++++++++----------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index bb58d00826c..3d4b0641490 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.internal.connection.ServerAddressHelper.getSocketAddresses; @@ -100,35 +99,34 @@ public void close() { group.shutdown(); } + /** + * Monitors `OP_CONNECT` events for socket connections. + */ private static class SelectorMonitor implements Closeable { static final class SocketRegistration { private final SocketChannel socketChannel; - private final Runnable attachment; - private final AtomicReference connectionRegistrationState; + private final AtomicReference afterConnectAction; - enum ConnectionRegistrationState { - CONNECTING, - CONNECTED, - TIMEOUT_OUT + SocketRegistration(final SocketChannel socketChannel, final Runnable afterConnectAction) { + this.socketChannel = socketChannel; + this.afterConnectAction = new AtomicReference<>(afterConnectAction); } - private SocketRegistration(final SocketChannel socketChannel, final Runnable attachment) { - this.socketChannel = socketChannel; - this.attachment = attachment; - this.connectionRegistrationState = new AtomicReference<>(ConnectionRegistrationState.CONNECTING); + boolean tryCancelPendingConnection() { + return tryTakeAction() != null; } - public boolean markConnectionEstablishmentTimedOut() { - return connectionRegistrationState.compareAndSet( - ConnectionRegistrationState.CONNECTING, - ConnectionRegistrationState.TIMEOUT_OUT); + void runAfterConnectActionIfNotCanceled() { + Runnable afterConnectActionToExecute = tryTakeAction(); + if (afterConnectActionToExecute != null) { + afterConnectActionToExecute.run(); + } } - public boolean markConnectionEstablishmentCompleted() { - return connectionRegistrationState.compareAndSet( - ConnectionRegistrationState.CONNECTING, - ConnectionRegistrationState.CONNECTED); + @Nullable + private Runnable tryTakeAction() { + return afterConnectAction.getAndSet(null); } } @@ -144,7 +142,6 @@ public boolean markConnectionEstablishmentCompleted() { } } - // Monitors OP_CONNECT events. void start() { Thread selectorThread = new Thread(() -> { try { @@ -153,13 +150,7 @@ void start() { selector.select(); for (SelectionKey selectionKey : selector.selectedKeys()) { selectionKey.cancel(); - SocketRegistration socketRegistration = (SocketRegistration) selectionKey.attachment(); - - boolean markedCompleted = socketRegistration.markConnectionEstablishmentCompleted(); - if (markedCompleted) { - Runnable runnable = socketRegistration.attachment; - runnable.run(); - } + ((SocketRegistration) selectionKey.attachment()).runAfterConnectActionIfNotCanceled(); } for (Iterator iter = pendingRegistrations.iterator(); iter.hasNext();) { @@ -228,15 +219,14 @@ public void openAsync(final OperationContext operationContext, final AsyncComple if (getSettings().getSendBufferSize() > 0) { socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, getSettings().getSendBufferSize()); } - + //getConnectTimeoutMs MUST be called before connection attempt, as it might throw MongoOperationTimeout exception. + int connectTimeoutMs = operationContext.getTimeoutContext().getConnectTimeoutMs(); socketChannel.connect(getSocketAddresses(getServerAddress(), inetAddressResolver).get(0)); - SelectorMonitor.SocketRegistration socketRegistration = new SelectorMonitor.SocketRegistration( socketChannel, () -> initializeTslChannel(handler, socketChannel)); - int connectTimeoutMs = getSettings().getConnectTimeout(TimeUnit.MILLISECONDS); if (connectTimeoutMs > 0) { - scheduleTimeoutInterruption(handler, socketRegistration, socketChannel, connectTimeoutMs); + scheduleTimeoutInterruption(handler, socketRegistration, connectTimeoutMs); } selectorMonitor.register(socketRegistration); } catch (IOException e) { @@ -248,12 +238,10 @@ public void openAsync(final OperationContext operationContext, final AsyncComple private void scheduleTimeoutInterruption(final AsyncCompletionHandler handler, final SelectorMonitor.SocketRegistration socketRegistration, - final SocketChannel socketChannel, final int connectTimeoutMs) { group.getTimeoutExecutor().schedule(() -> { - boolean markedTimedOut = socketRegistration.markConnectionEstablishmentTimedOut(); - if (markedTimedOut) { - closeAndTimeout(handler, socketChannel); + if (socketRegistration.tryCancelPendingConnection()) { + closeAndTimeout(handler, socketRegistration.socketChannel); } }, connectTimeoutMs, TimeUnit.MILLISECONDS); } From b8ac9f67bdd4797ca6144aae887026e922fddc28 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 28 Apr 2025 23:23:45 -0700 Subject: [PATCH 07/13] Add Functional tests and remove TODOs. JAVA-5856 --- .../TlsChannelStreamFactoryFactory.java | 4 - .../TlsChannelStreamFunctionalTest.java | 113 ++++++++++++++++++ 2 files changed, 113 insertions(+), 4 deletions(-) create mode 100644 driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index 3d4b0641490..df8b3c2fe42 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -174,7 +174,6 @@ void start() { selectorThread.start(); } - void register(final SocketRegistration registration) { pendingRegistrations.add(registration); selector.wakeup(); @@ -249,13 +248,10 @@ private void scheduleTimeoutInterruption(final AsyncCompletionHandler hand private void closeAndTimeout(final AsyncCompletionHandler handler, final SocketChannel socketChannel) { // We check if this stream was closed before timeout exception. boolean streamClosed = isClosed(); - - //TODO refactor ths draft InterruptedByTimeoutException timeoutException = new InterruptedByTimeoutException(); try { socketChannel.close(); } catch (Exception e) { - //TODO should ignore this exception? We seem to do so in other places timeoutException.addSuppressed(e); } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java new file mode 100644 index 00000000000..0948c32617a --- /dev/null +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection; + +import com.mongodb.MongoSocketOpenException; +import com.mongodb.ServerAddress; +import com.mongodb.connection.SocketSettings; +import com.mongodb.connection.SslSettings; +import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.TimeoutSettings; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.net.ServerSocket; +import java.nio.channels.InterruptedByTimeoutException; +import java.util.concurrent.TimeUnit; + +import static com.mongodb.assertions.Assertions.assertTrue; +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +class TlsChannelStreamFunctionalTest { + private static final SslSettings SSL_SETTINGS = SslSettings.builder().enabled(true).build(); + private static final String UNREACHABLE_PRIVATE_IP_ADDRESS = "10.255.255.1"; + private ServerSocket serverSocket; + private int port; + + @BeforeEach + void setUp() throws IOException { + serverSocket = new ServerSocket(0, 1); + port = serverSocket.getLocalPort(); + } + + @AfterEach + void cleanUp() throws IOException { + try (ServerSocket ignore = serverSocket) { + } + } + + @ParameterizedTest + @ValueSource(ints = {500, 1000, 2000}) + void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final int connectTimeout) { + //given + try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver())) { + StreamFactory streamFactory = factory.create(SocketSettings.builder() + .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) + .build(), SSL_SETTINGS); + + Stream stream = streamFactory.create(new ServerAddress(UNREACHABLE_PRIVATE_IP_ADDRESS, port)); + long connectOpenStart = System.nanoTime(); + + //when + MongoSocketOpenException mongoSocketOpenException = assertThrows(MongoSocketOpenException.class, () -> + stream.open(OperationContext + .simpleOperationContext(new TimeoutContext(TimeoutSettings.DEFAULT + .withConnectTimeoutMS(connectTimeout))))); + + //then + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - connectOpenStart); + assertInstanceOf(InterruptedByTimeoutException.class, mongoSocketOpenException.getCause(), + "Actual cause: " + mongoSocketOpenException.getCause()); + long diff = elapsedMs - connectTimeout; + assertFalse(diff < 0, + String.format("Connection timed-out sooner than expected. Difference: %d ms", diff)); + // Allowed difference, with test overhead setup is 300MS. + int epsilon = 300; + assertTrue(diff < epsilon, + String.format("Elapsed time %d ms should be within %d ms of the connect timeout", elapsedMs, epsilon)); + } + } + + @ParameterizedTest + @ValueSource(ints = {0, 500, 1000, 2000}) + void shouldEstablishConnection(final int connectTimeout) throws IOException { + try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver())) { + //given + StreamFactory streamFactory = factory.create(SocketSettings.builder() + .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) + .build(), SSL_SETTINGS); + + Stream stream = streamFactory.create(new ServerAddress("localhost", port)); + + try { + //when + stream.open(OperationContext.simpleOperationContext( + new TimeoutContext(TimeoutSettings.DEFAULT.withConnectTimeoutMS(connectTimeout)))); + + //then + assertFalse(stream.isClosed()); + } finally { + stream.close(); + } + } + } +} From 0bf25c02c1f9c99846b1dc94b7f1d585e5a0f2ad Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 28 Apr 2025 23:31:25 -0700 Subject: [PATCH 08/13] Fix tests. JAVA-5856 --- .../main/com/mongodb/internal/TimeoutSettings.java | 5 +++++ .../connection/TlsChannelStreamFunctionalTest.java | 14 +++++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutSettings.java b/driver-core/src/main/com/mongodb/internal/TimeoutSettings.java index 486a893d74c..e1f0bc0b795 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutSettings.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutSettings.java @@ -165,6 +165,11 @@ public TimeoutSettings withReadTimeoutMS(final long readTimeoutMS) { maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); } + public TimeoutSettings withConnectTimeoutMS(final long connectTimeoutMS) { + return new TimeoutSettings(generationId, timeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, + maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); + } + public TimeoutSettings withServerSelectionTimeoutMS(final long serverSelectionTimeoutMS) { return new TimeoutSettings(timeoutMS, serverSelectionTimeoutMS, connectTimeoutMS, readTimeoutMS, maxAwaitTimeMS, maxTimeMS, maxCommitTimeMS, wTimeoutMS, maxWaitTimeMS); diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java index 0948c32617a..b406975bf8e 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java @@ -75,29 +75,29 @@ void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final in //then long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - connectOpenStart); + long diff = elapsedMs - connectTimeout; + // Allowed difference, with test overhead setup is 300MS. + int epsilonMs = 300; + assertInstanceOf(InterruptedByTimeoutException.class, mongoSocketOpenException.getCause(), "Actual cause: " + mongoSocketOpenException.getCause()); - long diff = elapsedMs - connectTimeout; assertFalse(diff < 0, String.format("Connection timed-out sooner than expected. Difference: %d ms", diff)); - // Allowed difference, with test overhead setup is 300MS. - int epsilon = 300; - assertTrue(diff < epsilon, - String.format("Elapsed time %d ms should be within %d ms of the connect timeout", elapsedMs, epsilon)); + assertTrue(diff < epsilonMs, + String.format("Elapsed time %d ms should be within %d ms of the connect timeout", elapsedMs, epsilonMs)); } } @ParameterizedTest @ValueSource(ints = {0, 500, 1000, 2000}) void shouldEstablishConnection(final int connectTimeout) throws IOException { + //given try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver())) { - //given StreamFactory streamFactory = factory.create(SocketSettings.builder() .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) .build(), SSL_SETTINGS); Stream stream = streamFactory.create(new ServerAddress("localhost", port)); - try { //when stream.open(OperationContext.simpleOperationContext( From b3c936149966fb051aff4cf6cf96dfd89c4d1a5f Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 28 Apr 2025 23:33:52 -0700 Subject: [PATCH 09/13] Add override tests for connectTimeoutMs. JAVA-5856 --- .../test/unit/com/mongodb/internal/TimeoutSettingsTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutSettingsTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutSettingsTest.java index 71f63d32e6d..9bffd08542b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutSettingsTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutSettingsTest.java @@ -53,10 +53,11 @@ Collection timeoutSettingsTest() { .withMaxAwaitTimeMS(11) .withMaxCommitMS(999L) .withReadTimeoutMS(11_000) + .withConnectTimeoutMS(500) .withWTimeoutMS(222L); assertAll( () -> assertEquals(30_000, timeoutSettings.getServerSelectionTimeoutMS()), - () -> assertEquals(10_000, timeoutSettings.getConnectTimeoutMS()), + () -> assertEquals(500, timeoutSettings.getConnectTimeoutMS()), () -> assertEquals(11_000, timeoutSettings.getReadTimeoutMS()), () -> assertEquals(100, timeoutSettings.getTimeoutMS()), () -> assertEquals(111, timeoutSettings.getMaxTimeMS()), From 0ef564dc07a863dda8fc98675ddedc2a76e61ce5 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 29 Apr 2025 14:44:28 -0700 Subject: [PATCH 10/13] Add SocketChannel verifications. JAVA-5856 --- .../TlsChannelStreamFunctionalTest.java | 78 +++++++++++++++---- 1 file changed, 64 insertions(+), 14 deletions(-) diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java index b406975bf8e..e039a3c15bc 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java @@ -26,16 +26,27 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; import java.net.ServerSocket; import java.nio.channels.InterruptedByTimeoutException; +import java.nio.channels.SocketChannel; import java.util.concurrent.TimeUnit; -import static com.mongodb.assertions.Assertions.assertTrue; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertThrows; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.verify; class TlsChannelStreamFunctionalTest { private static final SslSettings SSL_SETTINGS = SslSettings.builder().enabled(true).build(); @@ -50,16 +61,22 @@ void setUp() throws IOException { } @AfterEach + @SuppressWarnings("try") void cleanUp() throws IOException { - try (ServerSocket ignore = serverSocket) { + try (ServerSocket ignored = serverSocket) { + //ignored } } @ParameterizedTest @ValueSource(ints = {500, 1000, 2000}) - void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final int connectTimeout) { + void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final int connectTimeout) throws IOException { //given - try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver())) { + try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver()); + MockedStatic socketChannelMockedStatic = Mockito.mockStatic(SocketChannel.class)) { + SingleResultSpyCaptor singleResultSpyCaptor = new SingleResultSpyCaptor<>(); + socketChannelMockedStatic.when(SocketChannel::open).thenAnswer(singleResultSpyCaptor); + StreamFactory streamFactory = factory.create(SocketSettings.builder() .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) .build(), SSL_SETTINGS); @@ -75,39 +92,72 @@ void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final in //then long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - connectOpenStart); - long diff = elapsedMs - connectTimeout; - // Allowed difference, with test overhead setup is 300MS. - int epsilonMs = 300; + // Allow for some timing imprecision due to test overhead. + int maximumAcceptableTimeoutOvershoot = 300; assertInstanceOf(InterruptedByTimeoutException.class, mongoSocketOpenException.getCause(), "Actual cause: " + mongoSocketOpenException.getCause()); - assertFalse(diff < 0, - String.format("Connection timed-out sooner than expected. Difference: %d ms", diff)); - assertTrue(diff < epsilonMs, - String.format("Elapsed time %d ms should be within %d ms of the connect timeout", elapsedMs, epsilonMs)); + assertFalse(connectTimeout > elapsedMs, + format("Connection timed-out sooner than expected. ConnectTimeoutMS: %d, elapsedMs: %d", connectTimeout, elapsedMs)); + assertTrue(elapsedMs - connectTimeout < maximumAcceptableTimeoutOvershoot, + format("Connection timeout overshoot time %d ms should be within %d ms", elapsedMs - connectTimeout, + maximumAcceptableTimeoutOvershoot)); + + SocketChannel actualSpySocketChannel = singleResultSpyCaptor.getResult(); + assertNotNull(actualSpySocketChannel, "SocketChannel was not opened"); + verify(actualSpySocketChannel, atLeast(1)).close(); } } @ParameterizedTest @ValueSource(ints = {0, 500, 1000, 2000}) - void shouldEstablishConnection(final int connectTimeout) throws IOException { + void shouldEstablishConnection(final int connectTimeout) throws IOException, InterruptedException { //given - try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver())) { + try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver()); + MockedStatic socketChannelMockedStatic = Mockito.mockStatic(SocketChannel.class)) { + SingleResultSpyCaptor singleResultSpyCaptor = new SingleResultSpyCaptor<>(); + socketChannelMockedStatic.when(SocketChannel::open).thenAnswer(singleResultSpyCaptor); + StreamFactory streamFactory = factory.create(SocketSettings.builder() .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) .build(), SSL_SETTINGS); - Stream stream = streamFactory.create(new ServerAddress("localhost", port)); + Stream stream = streamFactory.create(new ServerAddress(serverSocket.getInetAddress(), port)); try { //when stream.open(OperationContext.simpleOperationContext( new TimeoutContext(TimeoutSettings.DEFAULT.withConnectTimeoutMS(connectTimeout)))); //then + SocketChannel actualSpySocketChannel = singleResultSpyCaptor.getResult(); + assertNotNull(actualSpySocketChannel, "SocketChannel was not opened"); + assertTrue(actualSpySocketChannel.isConnected()); + + // Wait to verify that socket was not closed by timeout. + SECONDS.sleep(3); + assertTrue(actualSpySocketChannel.isConnected()); assertFalse(stream.isClosed()); } finally { stream.close(); } } } + + private static final class SingleResultSpyCaptor implements Answer { + private volatile T result = null; + + public T getResult() { + return result; + } + + @Override + public T answer(InvocationOnMock invocationOnMock) throws Throwable { + if (result != null) { + fail(invocationOnMock.getMethod().getName() + " was called more then once"); + } + T returnedValue = (T) invocationOnMock.callRealMethod(); + result = Mockito.spy(returnedValue); + return result; + } + } } From 39ee2fef09254d5c4e0e0a8e1019bb79c47ff732 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Tue, 29 Apr 2025 16:54:23 -0700 Subject: [PATCH 11/13] Remove redundant logic. JAVA-5856 --- .../TlsChannelStreamFunctionalTest.java | 74 ++++++++----------- 1 file changed, 30 insertions(+), 44 deletions(-) diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java index e039a3c15bc..ed177a732d2 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java @@ -22,8 +22,6 @@ import com.mongodb.connection.SslSettings; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.TimeoutSettings; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.MockedStatic; @@ -37,12 +35,13 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.TimeUnit; +import static com.mongodb.internal.connection.OperationContext.simpleOperationContext; import static java.lang.String.format; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.assertThrows; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.atLeast; @@ -51,56 +50,39 @@ class TlsChannelStreamFunctionalTest { private static final SslSettings SSL_SETTINGS = SslSettings.builder().enabled(true).build(); private static final String UNREACHABLE_PRIVATE_IP_ADDRESS = "10.255.255.1"; - private ServerSocket serverSocket; - private int port; - - @BeforeEach - void setUp() throws IOException { - serverSocket = new ServerSocket(0, 1); - port = serverSocket.getLocalPort(); - } - - @AfterEach - @SuppressWarnings("try") - void cleanUp() throws IOException { - try (ServerSocket ignored = serverSocket) { - //ignored - } - } + private static final int UNREACHABLE_PORT = 65333; @ParameterizedTest @ValueSource(ints = {500, 1000, 2000}) - void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final int connectTimeout) throws IOException { + void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final int connectTimeoutMs) throws IOException { //given - try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver()); + try (StreamFactoryFactory streamFactoryFactory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver()); MockedStatic socketChannelMockedStatic = Mockito.mockStatic(SocketChannel.class)) { SingleResultSpyCaptor singleResultSpyCaptor = new SingleResultSpyCaptor<>(); socketChannelMockedStatic.when(SocketChannel::open).thenAnswer(singleResultSpyCaptor); - StreamFactory streamFactory = factory.create(SocketSettings.builder() - .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) + StreamFactory streamFactory = streamFactoryFactory.create(SocketSettings.builder() + .connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS) .build(), SSL_SETTINGS); - Stream stream = streamFactory.create(new ServerAddress(UNREACHABLE_PRIVATE_IP_ADDRESS, port)); + Stream stream = streamFactory.create(new ServerAddress(UNREACHABLE_PRIVATE_IP_ADDRESS, UNREACHABLE_PORT)); long connectOpenStart = System.nanoTime(); //when + OperationContext operationContext = createOperationContext(connectTimeoutMs); MongoSocketOpenException mongoSocketOpenException = assertThrows(MongoSocketOpenException.class, () -> - stream.open(OperationContext - .simpleOperationContext(new TimeoutContext(TimeoutSettings.DEFAULT - .withConnectTimeoutMS(connectTimeout))))); + stream.open(operationContext)); //then long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - connectOpenStart); // Allow for some timing imprecision due to test overhead. int maximumAcceptableTimeoutOvershoot = 300; - assertInstanceOf(InterruptedByTimeoutException.class, mongoSocketOpenException.getCause(), - "Actual cause: " + mongoSocketOpenException.getCause()); - assertFalse(connectTimeout > elapsedMs, - format("Connection timed-out sooner than expected. ConnectTimeoutMS: %d, elapsedMs: %d", connectTimeout, elapsedMs)); - assertTrue(elapsedMs - connectTimeout < maximumAcceptableTimeoutOvershoot, - format("Connection timeout overshoot time %d ms should be within %d ms", elapsedMs - connectTimeout, + assertInstanceOf(InterruptedByTimeoutException.class, mongoSocketOpenException.getCause()); + assertFalse(connectTimeoutMs > elapsedMs, + format("Connection timed-out sooner than expected. ConnectTimeoutMS: %d, elapsedMs: %d", connectTimeoutMs, elapsedMs)); + assertTrue(elapsedMs - connectTimeoutMs < maximumAcceptableTimeoutOvershoot, + format("Connection timeout overshoot time %d ms should be within %d ms", elapsedMs - connectTimeoutMs, maximumAcceptableTimeoutOvershoot)); SocketChannel actualSpySocketChannel = singleResultSpyCaptor.getResult(); @@ -111,22 +93,22 @@ void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final in @ParameterizedTest @ValueSource(ints = {0, 500, 1000, 2000}) - void shouldEstablishConnection(final int connectTimeout) throws IOException, InterruptedException { + void shouldEstablishConnection(final int connectTimeoutMs) throws IOException, InterruptedException { //given - try (TlsChannelStreamFactoryFactory factory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver()); - MockedStatic socketChannelMockedStatic = Mockito.mockStatic(SocketChannel.class)) { + try (StreamFactoryFactory streamFactoryFactory = new TlsChannelStreamFactoryFactory(new DefaultInetAddressResolver()); + MockedStatic socketChannelMockedStatic = Mockito.mockStatic(SocketChannel.class); + ServerSocket serverSocket = new ServerSocket(0, 1)) { SingleResultSpyCaptor singleResultSpyCaptor = new SingleResultSpyCaptor<>(); socketChannelMockedStatic.when(SocketChannel::open).thenAnswer(singleResultSpyCaptor); - StreamFactory streamFactory = factory.create(SocketSettings.builder() - .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) + StreamFactory streamFactory = streamFactoryFactory.create(SocketSettings.builder() + .connectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS) .build(), SSL_SETTINGS); - Stream stream = streamFactory.create(new ServerAddress(serverSocket.getInetAddress(), port)); + Stream stream = streamFactory.create(new ServerAddress(serverSocket.getInetAddress(), serverSocket.getLocalPort())); try { //when - stream.open(OperationContext.simpleOperationContext( - new TimeoutContext(TimeoutSettings.DEFAULT.withConnectTimeoutMS(connectTimeout)))); + stream.open(createOperationContext(connectTimeoutMs)); //then SocketChannel actualSpySocketChannel = singleResultSpyCaptor.getResult(); @@ -134,7 +116,7 @@ void shouldEstablishConnection(final int connectTimeout) throws IOException, Int assertTrue(actualSpySocketChannel.isConnected()); // Wait to verify that socket was not closed by timeout. - SECONDS.sleep(3); + MILLISECONDS.sleep(connectTimeoutMs * 2L); assertTrue(actualSpySocketChannel.isConnected()); assertFalse(stream.isClosed()); } finally { @@ -151,7 +133,7 @@ public T getResult() { } @Override - public T answer(InvocationOnMock invocationOnMock) throws Throwable { + public T answer(final InvocationOnMock invocationOnMock) throws Throwable { if (result != null) { fail(invocationOnMock.getMethod().getName() + " was called more then once"); } @@ -160,4 +142,8 @@ public T answer(InvocationOnMock invocationOnMock) throws Throwable { return result; } } + + private static OperationContext createOperationContext(final int connectTimeoutMs) { + return simpleOperationContext(new TimeoutContext(TimeoutSettings.DEFAULT.withConnectTimeoutMS(connectTimeoutMs))); + } } From 097ca4ffe2e236322fdce5bac0d125e7cf04e232 Mon Sep 17 00:00:00 2001 From: Viacheslav Babanin Date: Tue, 29 Apr 2025 21:17:02 -0700 Subject: [PATCH 12/13] Update driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java Co-authored-by: Valentin Kovalenko --- .../internal/connection/TlsChannelStreamFunctionalTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java index ed177a732d2..27fe11cd825 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java @@ -81,7 +81,7 @@ void shouldInterruptConnectionEstablishmentWhenConnectionTimeoutExpires(final in assertInstanceOf(InterruptedByTimeoutException.class, mongoSocketOpenException.getCause()); assertFalse(connectTimeoutMs > elapsedMs, format("Connection timed-out sooner than expected. ConnectTimeoutMS: %d, elapsedMs: %d", connectTimeoutMs, elapsedMs)); - assertTrue(elapsedMs - connectTimeoutMs < maximumAcceptableTimeoutOvershoot, + assertTrue(elapsedMs - connectTimeoutMs <= maximumAcceptableTimeoutOvershoot, format("Connection timeout overshoot time %d ms should be within %d ms", elapsedMs - connectTimeoutMs, maximumAcceptableTimeoutOvershoot)); From 48d3a522026096c7351279e93ce907a76bb22261 Mon Sep 17 00:00:00 2001 From: Viacheslav Babanin Date: Tue, 29 Apr 2025 21:17:08 -0700 Subject: [PATCH 13/13] Update driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java Co-authored-by: Valentin Kovalenko --- .../internal/connection/TlsChannelStreamFunctionalTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java index 27fe11cd825..3f80fcddfa3 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TlsChannelStreamFunctionalTest.java @@ -137,6 +137,7 @@ public T answer(final InvocationOnMock invocationOnMock) throws Throwable { if (result != null) { fail(invocationOnMock.getMethod().getName() + " was called more then once"); } + @SuppressWarnings("unchecked") T returnedValue = (T) invocationOnMock.callRealMethod(); result = Mockito.spy(returnedValue); return result;