diff --git a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java index ea05ff59a81..73b3d92ad26 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java @@ -185,6 +185,9 @@ private void doAdvanceOrThrow(final Throwable attemptException, final boolean onlyRuntimeExceptions) throws Throwable { assertTrue(attempt() < attempts); assertNotNull(attemptException); + if (attemptException instanceof MongoOperationTimeoutException) { + throw attemptException; + } if (onlyRuntimeExceptions) { assertTrue(isRuntime(attemptException)); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/Authenticator.java b/driver-core/src/main/com/mongodb/internal/connection/Authenticator.java index 9ec4780d958..4638c7790b3 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/Authenticator.java +++ b/driver-core/src/main/com/mongodb/internal/connection/Authenticator.java @@ -89,8 +89,9 @@ T getNonNullMechanismProperty(final String key, @Nullable final T defaultVal } - abstract void authenticate(InternalConnection connection, ConnectionDescription connectionDescription); + abstract void authenticate(InternalConnection connection, ConnectionDescription connectionDescription, + OperationContext operationContext); abstract void authenticateAsync(InternalConnection connection, ConnectionDescription connectionDescription, - SingleResultCallback callback); + OperationContext operationContext, SingleResultCallback callback); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java b/driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java index c129cbf75bf..cbd542a1f3e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CommandHelper.java @@ -20,11 +20,7 @@ import com.mongodb.MongoServerException; import com.mongodb.ServerApi; import com.mongodb.connection.ClusterConnectionMode; -import com.mongodb.internal.IgnorableRequestContext; -import com.mongodb.internal.TimeoutContext; -import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.session.SessionContext; import com.mongodb.internal.validator.NoOpFieldNameValidator; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -47,26 +43,25 @@ public final class CommandHelper { static final String LEGACY_HELLO_LOWER = LEGACY_HELLO.toLowerCase(Locale.ROOT); static BsonDocument executeCommand(final String database, final BsonDocument command, final ClusterConnectionMode clusterConnectionMode, - @Nullable final ServerApi serverApi, final InternalConnection internalConnection) { - return sendAndReceive(database, command, clusterConnectionMode, serverApi, internalConnection); + @Nullable final ServerApi serverApi, final InternalConnection internalConnection, final OperationContext operationContext) { + return sendAndReceive(database, command, clusterConnectionMode, serverApi, internalConnection, operationContext); } static BsonDocument executeCommandWithoutCheckingForFailure(final String database, final BsonDocument command, - final ClusterConnectionMode clusterConnectionMode, - @Nullable final ServerApi serverApi, - final InternalConnection internalConnection) { + final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi, + final InternalConnection internalConnection, final OperationContext operationContext) { try { - return sendAndReceive(database, command, clusterConnectionMode, serverApi, internalConnection); + return executeCommand(database, command, clusterConnectionMode, serverApi, internalConnection, operationContext); } catch (MongoServerException e) { return new BsonDocument(); } } static void executeCommandAsync(final String database, final BsonDocument command, final ClusterConnectionMode clusterConnectionMode, - @Nullable final ServerApi serverApi, final InternalConnection internalConnection, - final SingleResultCallback callback) { + @Nullable final ServerApi serverApi, final InternalConnection internalConnection, final OperationContext operationContext, + final SingleResultCallback callback) { internalConnection.sendAndReceiveAsync(getCommandMessage(database, command, internalConnection, clusterConnectionMode, serverApi), - new BsonDocumentCodec(), createOperationContext(NoOpSessionContext.INSTANCE, serverApi), + new BsonDocumentCodec(), operationContext, (result, t) -> { if (t != null) { callback.onResult(null, t); @@ -90,19 +85,15 @@ static boolean isCommandOk(final BsonDocument response) { } } - static OperationContext createOperationContext(final SessionContext sessionContext, @Nullable final ServerApi serverApi) { - return new OperationContext(IgnorableRequestContext.INSTANCE, sessionContext, - new TimeoutContext(TimeoutSettings.DEFAULT), serverApi); - } - private static BsonDocument sendAndReceive(final String database, final BsonDocument command, final ClusterConnectionMode clusterConnectionMode, @Nullable final ServerApi serverApi, - final InternalConnection internalConnection) { + final InternalConnection internalConnection, + final OperationContext operationContext) { return assertNotNull( internalConnection.sendAndReceive( getCommandMessage(database, command, internalConnection, clusterConnectionMode, serverApi), - new BsonDocumentCodec(), createOperationContext(NoOpSessionContext.INSTANCE, serverApi)) + new BsonDocumentCodec(), operationContext) ); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultAuthenticator.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultAuthenticator.java index 86b081b621d..cf901c6021e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultAuthenticator.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultAuthenticator.java @@ -47,14 +47,15 @@ class DefaultAuthenticator extends Authenticator implements SpeculativeAuthentic } @Override - void authenticate(final InternalConnection connection, final ConnectionDescription connectionDescription) { + void authenticate(final InternalConnection connection, final ConnectionDescription connectionDescription, + final OperationContext operationContext) { if (serverIsLessThanVersionFourDotZero(connectionDescription)) { new ScramShaAuthenticator(getMongoCredentialWithCache().withMechanism(SCRAM_SHA_1), getClusterConnectionMode(), getServerApi()) - .authenticate(connection, connectionDescription); + .authenticate(connection, connectionDescription, operationContext); } else { try { setDelegate(connectionDescription); - delegate.authenticate(connection, connectionDescription); + delegate.authenticate(connection, connectionDescription, operationContext); } catch (Exception e) { throw wrapException(e); } @@ -63,13 +64,13 @@ void authenticate(final InternalConnection connection, final ConnectionDescripti @Override void authenticateAsync(final InternalConnection connection, final ConnectionDescription connectionDescription, - final SingleResultCallback callback) { + final OperationContext operationContext, final SingleResultCallback callback) { if (serverIsLessThanVersionFourDotZero(connectionDescription)) { new ScramShaAuthenticator(getMongoCredentialWithCache().withMechanism(SCRAM_SHA_1), getClusterConnectionMode(), getServerApi()) - .authenticateAsync(connection, connectionDescription, callback); + .authenticateAsync(connection, connectionDescription, operationContext, callback); } else { setDelegate(connectionDescription); - delegate.authenticateAsync(connection, connectionDescription, callback); + delegate.authenticateAsync(connection, connectionDescription, operationContext, callback); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index b6ac2caf123..4f6a98836bd 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -437,9 +437,10 @@ private void initialize() { private void pingServer(final InternalConnection connection) { long start = System.nanoTime(); + OperationContext operationContext = operationContextFactory.create(); executeCommand("admin", new BsonDocument(getHandshakeCommandName(connection.getInitialServerDescription()), new BsonInt32(1)), - clusterConnectionMode, serverApi, connection); + clusterConnectionMode, serverApi, connection, operationContext); long elapsedTimeNanos = System.nanoTime() - start; roundTripTimeSampler.addSample(elapsedTimeNanos); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalConnectionInitializer.java index 9826f20b69b..077e2c68254 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalConnectionInitializer.java @@ -20,14 +20,19 @@ interface InternalConnectionInitializer { - InternalConnectionInitializationDescription startHandshake(InternalConnection internalConnection); + InternalConnectionInitializationDescription startHandshake(InternalConnection internalConnection, + OperationContext operationContext); InternalConnectionInitializationDescription finishHandshake(InternalConnection internalConnection, - InternalConnectionInitializationDescription description); + InternalConnectionInitializationDescription description, + OperationContext operationContext); void startHandshakeAsync(InternalConnection internalConnection, + OperationContext operationContext, SingleResultCallback callback); - void finishHandshakeAsync(InternalConnection internalConnection, InternalConnectionInitializationDescription description, + void finishHandshakeAsync(InternalConnection internalConnection, + InternalConnectionInitializationDescription description, + OperationContext operationContext, SingleResultCallback callback); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index f8e2e07fa23..a179b532fe6 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -202,10 +202,10 @@ public void open(final OperationContext operationContext) { try { stream.open(operationContext); - InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this); + InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this, operationContext); initAfterHandshakeStart(initializationDescription); - initializationDescription = connectionInitializer.finishHandshake(this, initializationDescription); + initializationDescription = connectionInitializer.finishHandshake(this, initializationDescription, operationContext); initAfterHandshakeFinish(initializationDescription); } catch (Throwable t) { close(); @@ -226,7 +226,7 @@ public void openAsync(final OperationContext operationContext, final SingleResul @Override public void completed(@Nullable final Void aVoid) { - connectionInitializer.startHandshakeAsync(InternalStreamConnection.this, + connectionInitializer.startHandshakeAsync(InternalStreamConnection.this, operationContext, (initialResult, initialException) -> { if (initialException != null) { close(); @@ -235,7 +235,7 @@ public void completed(@Nullable final Void aVoid) { assertNotNull(initialResult); initAfterHandshakeStart(initialResult); connectionInitializer.finishHandshakeAsync(InternalStreamConnection.this, - initialResult, (completedResult, completedException) -> { + initialResult, operationContext, (completedResult, completedException) -> { if (completedException != null) { close(); callback.onResult(null, completedException); diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java index 47f9857d995..e02dc4dd546 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java @@ -72,27 +72,29 @@ public InternalStreamConnectionInitializer(final ClusterConnectionMode clusterCo } @Override - public InternalConnectionInitializationDescription startHandshake(final InternalConnection internalConnection) { + public InternalConnectionInitializationDescription startHandshake(final InternalConnection internalConnection, + final OperationContext operationContext) { notNull("internalConnection", internalConnection); - return initializeConnectionDescription(internalConnection); + return initializeConnectionDescription(internalConnection, operationContext); } public InternalConnectionInitializationDescription finishHandshake(final InternalConnection internalConnection, - final InternalConnectionInitializationDescription description) { + final InternalConnectionInitializationDescription description, + final OperationContext operationContext) { notNull("internalConnection", internalConnection); notNull("description", description); - authenticate(internalConnection, description.getConnectionDescription()); - return completeConnectionDescriptionInitialization(internalConnection, description); + authenticate(internalConnection, description.getConnectionDescription(), operationContext); + return completeConnectionDescriptionInitialization(internalConnection, description, operationContext); } @Override - public void startHandshakeAsync(final InternalConnection internalConnection, + public void startHandshakeAsync(final InternalConnection internalConnection, final OperationContext operationContext, final SingleResultCallback callback) { long startTime = System.nanoTime(); executeCommandAsync("admin", createHelloCommand(authenticator, internalConnection), clusterConnectionMode, serverApi, - internalConnection, (helloResult, t) -> { + internalConnection, operationContext, (helloResult, t) -> { if (t != null) { callback.onResult(null, t instanceof MongoException ? mapHelloException((MongoException) t) : t); } else { @@ -105,31 +107,35 @@ public void startHandshakeAsync(final InternalConnection internalConnection, @Override public void finishHandshakeAsync(final InternalConnection internalConnection, final InternalConnectionInitializationDescription description, + final OperationContext operationContext, final SingleResultCallback callback) { if (authenticator == null || description.getConnectionDescription().getServerType() == ServerType.REPLICA_SET_ARBITER) { - completeConnectionDescriptionInitializationAsync(internalConnection, description, callback); + completeConnectionDescriptionInitializationAsync(internalConnection, description, operationContext, callback); } else { - authenticator.authenticateAsync(internalConnection, description.getConnectionDescription(), + authenticator.authenticateAsync(internalConnection, description.getConnectionDescription(), operationContext, (result1, t1) -> { if (t1 != null) { callback.onResult(null, t1); } else { - completeConnectionDescriptionInitializationAsync(internalConnection, description, callback); + completeConnectionDescriptionInitializationAsync(internalConnection, description, operationContext, callback); } }); } } - private InternalConnectionInitializationDescription initializeConnectionDescription(final InternalConnection internalConnection) { + private InternalConnectionInitializationDescription initializeConnectionDescription(final InternalConnection internalConnection, + final OperationContext operationContext) { BsonDocument helloResult; BsonDocument helloCommandDocument = createHelloCommand(authenticator, internalConnection); long start = System.nanoTime(); try { - helloResult = executeCommand("admin", helloCommandDocument, clusterConnectionMode, serverApi, internalConnection); + helloResult = executeCommand("admin", helloCommandDocument, clusterConnectionMode, serverApi, internalConnection, operationContext); } catch (MongoException e) { throw mapHelloException(e); + } finally { + operationContext.getTimeoutContext().resetMaintenanceTimeout(); } setSpeculativeAuthenticateResponse(helloResult); return createInitializationDescription(helloResult, internalConnection, start); @@ -189,7 +195,8 @@ private BsonDocument createHelloCommand(final Authenticator authenticator, final private InternalConnectionInitializationDescription completeConnectionDescriptionInitialization( final InternalConnection internalConnection, - final InternalConnectionInitializationDescription description) { + final InternalConnectionInitializationDescription description, + final OperationContext operationContext) { if (description.getConnectionDescription().getConnectionId().getServerValue() != null) { return description; @@ -197,13 +204,14 @@ private InternalConnectionInitializationDescription completeConnectionDescriptio return applyGetLastErrorResult(executeCommandWithoutCheckingForFailure("admin", new BsonDocument("getlasterror", new BsonInt32(1)), clusterConnectionMode, serverApi, - internalConnection), + internalConnection, operationContext), description); } - private void authenticate(final InternalConnection internalConnection, final ConnectionDescription connectionDescription) { + private void authenticate(final InternalConnection internalConnection, final ConnectionDescription connectionDescription, + final OperationContext operationContext) { if (authenticator != null && connectionDescription.getServerType() != ServerType.REPLICA_SET_ARBITER) { - authenticator.authenticate(internalConnection, connectionDescription); + authenticator.authenticate(internalConnection, connectionDescription, operationContext); } } @@ -217,6 +225,7 @@ private void setSpeculativeAuthenticateResponse(final BsonDocument helloResult) private void completeConnectionDescriptionInitializationAsync( final InternalConnection internalConnection, final InternalConnectionInitializationDescription description, + final OperationContext operationContext, final SingleResultCallback callback) { if (description.getConnectionDescription().getConnectionId().getServerValue() != null) { @@ -225,7 +234,7 @@ private void completeConnectionDescriptionInitializationAsync( } executeCommandAsync("admin", new BsonDocument("getlasterror", new BsonInt32(1)), clusterConnectionMode, serverApi, - internalConnection, + internalConnection, operationContext, (result, t) -> { if (t != null) { callback.onResult(description, null); diff --git a/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java b/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java index 2c2321fcbad..4ee7068e0a7 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java @@ -18,6 +18,7 @@ import com.mongodb.MongoException; import com.mongodb.MongoInterruptedException; +import com.mongodb.MongoOperationTimeoutException; import com.mongodb.MongoSecurityException; import com.mongodb.ServerAddress; import com.mongodb.ServerApi; @@ -55,12 +56,13 @@ abstract class SaslAuthenticator extends Authenticator implements SpeculativeAut super(credential, clusterConnectionMode, serverApi); } - public void authenticate(final InternalConnection connection, final ConnectionDescription connectionDescription) { + public void authenticate(final InternalConnection connection, final ConnectionDescription connectionDescription, + final OperationContext operationContext) { doAsSubject(() -> { SaslClient saslClient = createSaslClient(connection.getDescription().getServerAddress()); throwIfSaslClientIsNull(saslClient); try { - BsonDocument responseDocument = getNextSaslResponse(saslClient, connection); + BsonDocument responseDocument = getNextSaslResponse(saslClient, connection, operationContext); BsonInt32 conversationId = responseDocument.getInt32("conversationId"); while (!(responseDocument.getBoolean("done")).getValue()) { @@ -72,7 +74,8 @@ public void authenticate(final InternalConnection connection, final ConnectionDe + getMongoCredential()); } - responseDocument = sendSaslContinue(conversationId, response, connection); + responseDocument = sendSaslContinue(conversationId, response, connection, operationContext); + operationContext.getTimeoutContext().resetMaintenanceTimeout(); } if (!saslClient.isComplete()) { saslClient.evaluateChallenge((responseDocument.getBinary("payload")).getData()); @@ -93,12 +96,12 @@ public void authenticate(final InternalConnection connection, final ConnectionDe @Override void authenticateAsync(final InternalConnection connection, final ConnectionDescription connectionDescription, - final SingleResultCallback callback) { + final OperationContext operationContext, final SingleResultCallback callback) { try { doAsSubject(() -> { SaslClient saslClient = createSaslClient(connection.getDescription().getServerAddress()); throwIfSaslClientIsNull(saslClient); - getNextSaslResponseAsync(saslClient, connection, callback); + getNextSaslResponseAsync(saslClient, connection, operationContext, callback); return null; }); } catch (Throwable t) { @@ -120,7 +123,8 @@ private void throwIfSaslClientIsNull(@Nullable final SaslClient saslClient) { } } - private BsonDocument getNextSaslResponse(final SaslClient saslClient, final InternalConnection connection) { + private BsonDocument getNextSaslResponse(final SaslClient saslClient, final InternalConnection connection, + final OperationContext operationContext) { BsonDocument response = getSpeculativeAuthenticateResponse(); if (response != null) { return response; @@ -128,20 +132,20 @@ private BsonDocument getNextSaslResponse(final SaslClient saslClient, final Inte try { byte[] serverResponse = saslClient.hasInitialResponse() ? saslClient.evaluateChallenge(new byte[0]) : null; - return sendSaslStart(serverResponse, connection); + return sendSaslStart(serverResponse, connection, operationContext); } catch (Exception e) { throw wrapException(e); } } private void getNextSaslResponseAsync(final SaslClient saslClient, final InternalConnection connection, - final SingleResultCallback callback) { + final OperationContext operationContext, final SingleResultCallback callback) { BsonDocument response = getSpeculativeAuthenticateResponse(); SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); try { if (response == null) { byte[] serverResponse = (saslClient.hasInitialResponse() ? saslClient.evaluateChallenge(new byte[0]) : null); - sendSaslStartAsync(serverResponse, connection, (result, t) -> { + sendSaslStartAsync(serverResponse, connection, operationContext, (result, t) -> { if (t != null) { errHandlingCallback.onResult(null, wrapException(t)); return; @@ -150,13 +154,13 @@ private void getNextSaslResponseAsync(final SaslClient saslClient, final Interna if (result.getBoolean("done").getValue()) { verifySaslClientComplete(saslClient, result, errHandlingCallback); } else { - new Continuator(saslClient, result, connection, errHandlingCallback).start(); + new Continuator(saslClient, result, connection, operationContext, errHandlingCallback).start(); } }); } else if (response.getBoolean("done").getValue()) { verifySaslClientComplete(saslClient, response, errHandlingCallback); } else { - new Continuator(saslClient, response, connection, errHandlingCallback).start(); + new Continuator(saslClient, response, connection, operationContext, errHandlingCallback).start(); } } catch (Exception e) { callback.onResult(null, wrapException(e)); @@ -218,29 +222,47 @@ protected SubjectProvider getDefaultSubjectProvider() { return () -> null; } - private BsonDocument sendSaslStart(@Nullable final byte[] outToken, final InternalConnection connection) { + private BsonDocument sendSaslStart(@Nullable final byte[] outToken, final InternalConnection connection, + final OperationContext operationContext) { BsonDocument startDocument = createSaslStartCommandDocument(outToken); appendSaslStartOptions(startDocument); - return executeCommand(getMongoCredential().getSource(), startDocument, getClusterConnectionMode(), getServerApi(), connection); + try { + return executeCommand(getMongoCredential().getSource(), startDocument, getClusterConnectionMode(), getServerApi(), connection, + operationContext); + } finally { + operationContext.getTimeoutContext().resetMaintenanceTimeout(); + } } - private BsonDocument sendSaslContinue(final BsonInt32 conversationId, final byte[] outToken, final InternalConnection connection) { - return executeCommand(getMongoCredential().getSource(), createSaslContinueDocument(conversationId, outToken), - getClusterConnectionMode(), getServerApi(), connection); + private BsonDocument sendSaslContinue(final BsonInt32 conversationId, final byte[] outToken, final InternalConnection connection, + final OperationContext operationContext) { + try { + return executeCommand(getMongoCredential().getSource(), createSaslContinueDocument(conversationId, outToken), + getClusterConnectionMode(), getServerApi(), connection, operationContext); + } finally { + operationContext.getTimeoutContext().resetMaintenanceTimeout(); + } } private void sendSaslStartAsync(@Nullable final byte[] outToken, final InternalConnection connection, - final SingleResultCallback callback) { + final OperationContext operationContext, final SingleResultCallback callback) { BsonDocument startDocument = createSaslStartCommandDocument(outToken); appendSaslStartOptions(startDocument); + executeCommandAsync(getMongoCredential().getSource(), startDocument, getClusterConnectionMode(), getServerApi(), connection, - callback); + operationContext, (r, t) -> { + operationContext.getTimeoutContext().resetMaintenanceTimeout(); + callback.onResult(r, t); + }); } private void sendSaslContinueAsync(final BsonInt32 conversationId, final byte[] outToken, final InternalConnection connection, - final SingleResultCallback callback) { + final OperationContext operationContext, final SingleResultCallback callback) { executeCommandAsync(getMongoCredential().getSource(), createSaslContinueDocument(conversationId, outToken), - getClusterConnectionMode(), getServerApi(), connection, callback); + getClusterConnectionMode(), getServerApi(), connection, operationContext, (r, t) -> { + operationContext.getTimeoutContext().resetMaintenanceTimeout(); + callback.onResult(r, t); + }); } protected BsonDocument createSaslStartCommandDocument(@Nullable final byte[] outToken) { @@ -264,6 +286,8 @@ private void disposeOfSaslClient(final SaslClient saslClient) { protected MongoException wrapException(final Throwable t) { if (t instanceof MongoInterruptedException) { return (MongoInterruptedException) t; + } else if (t instanceof MongoOperationTimeoutException) { + return (MongoOperationTimeoutException) t; } else if (t instanceof MongoSecurityException) { return (MongoSecurityException) t; } else { @@ -284,13 +308,15 @@ private final class Continuator implements SingleResultCallback { private final SaslClient saslClient; private final BsonDocument saslStartDocument; private final InternalConnection connection; + private final OperationContext operationContext; private final SingleResultCallback callback; Continuator(final SaslClient saslClient, final BsonDocument saslStartDocument, final InternalConnection connection, - final SingleResultCallback callback) { + final OperationContext operationContext, final SingleResultCallback callback) { this.saslClient = saslClient; this.saslStartDocument = saslStartDocument; this.connection = connection; + this.operationContext = operationContext; this.callback = callback; } @@ -319,13 +345,13 @@ private void continueConversation(final BsonDocument result) { doAsSubject(() -> { try { sendSaslContinueAsync(saslStartDocument.getInt32("conversationId"), - saslClient.evaluateChallenge((result.getBinary("payload")).getData()), connection, Continuator.this); + saslClient.evaluateChallenge((result.getBinary("payload")).getData()), connection, + operationContext, Continuator.this); } catch (SaslException e) { throw wrapException(e); } return null; }); - } catch (Throwable t) { callback.onResult(null, t); disposeOfSaslClient(saslClient); diff --git a/driver-core/src/main/com/mongodb/internal/connection/X509Authenticator.java b/driver-core/src/main/com/mongodb/internal/connection/X509Authenticator.java index 257ad8969d7..b5e2dd0512d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/X509Authenticator.java +++ b/driver-core/src/main/com/mongodb/internal/connection/X509Authenticator.java @@ -44,13 +44,14 @@ class X509Authenticator extends Authenticator implements SpeculativeAuthenticato } @Override - void authenticate(final InternalConnection connection, final ConnectionDescription connectionDescription) { + void authenticate(final InternalConnection connection, final ConnectionDescription connectionDescription, + final OperationContext operationContext) { if (this.speculativeAuthenticateResponse != null) { return; } try { BsonDocument authCommand = getAuthCommand(getMongoCredential().getUserName()); - executeCommand(getMongoCredential().getSource(), authCommand, getClusterConnectionMode(), getServerApi(), connection); + executeCommand(getMongoCredential().getSource(), authCommand, getClusterConnectionMode(), getServerApi(), connection, operationContext); } catch (MongoCommandException e) { throw new MongoSecurityException(getMongoCredential(), "Exception authenticating", e); } @@ -58,14 +59,14 @@ void authenticate(final InternalConnection connection, final ConnectionDescripti @Override void authenticateAsync(final InternalConnection connection, final ConnectionDescription connectionDescription, - final SingleResultCallback callback) { + final OperationContext operationContext, final SingleResultCallback callback) { if (speculativeAuthenticateResponse != null) { callback.onResult(null, null); } else { SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); try { executeCommandAsync(getMongoCredential().getSource(), getAuthCommand(getMongoCredential().getUserName()), - getClusterConnectionMode(), getServerApi(), connection, + getClusterConnectionMode(), getServerApi(), connection, operationContext, (nonceResult, t) -> { if (t != null) { errHandlingCallback.onResult(null, translateThrowable(t)); diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy index e0b50e87780..085a5100198 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/CommandHelperSpecification.groovy @@ -59,7 +59,7 @@ class CommandHelperSpecification extends Specification { Throwable receivedException = null def latch1 = new CountDownLatch(1) executeCommandAsync('admin', new BsonDocument(LEGACY_HELLO, new BsonInt32(1)), getClusterConnectionMode(), - getServerApi(), connection) + getServerApi(), connection, OPERATION_CONTEXT) { document, exception -> receivedDocument = document; receivedException = exception; latch1.countDown() } latch1.await() @@ -71,7 +71,7 @@ class CommandHelperSpecification extends Specification { when: def latch2 = new CountDownLatch(1) executeCommandAsync('admin', new BsonDocument('non-existent-command', new BsonInt32(1)), getClusterConnectionMode(), - getServerApi(), connection) + getServerApi(), connection, OPERATION_CONTEXT) { document, exception -> receivedDocument = document; receivedException = exception; latch2.countDown() } latch2.await() diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/PlainAuthenticatorTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/PlainAuthenticatorTest.java index e2377c8efef..6ab01fdfc8a 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/PlainAuthenticatorTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/PlainAuthenticatorTest.java @@ -32,6 +32,7 @@ import java.util.Collections; +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT; import static com.mongodb.ClusterFixture.getClusterConnectionMode; import static com.mongodb.ClusterFixture.getServerApi; import static com.mongodb.ClusterFixture.getSslSettings; @@ -67,14 +68,14 @@ public void tearDown() { public void testSuccessfulAuthentication() { PlainAuthenticator authenticator = new PlainAuthenticator(getCredentialWithCache(userName, source, password.toCharArray()), getClusterConnectionMode(), getServerApi()); - authenticator.authenticate(internalConnection, connectionDescription); + authenticator.authenticate(internalConnection, connectionDescription, OPERATION_CONTEXT); } @Test(expected = MongoSecurityException.class) public void testUnsuccessfulAuthentication() { PlainAuthenticator authenticator = new PlainAuthenticator(getCredentialWithCache(userName, source, "wrong".toCharArray()), getClusterConnectionMode(), getServerApi()); - authenticator.authenticate(internalConnection, connectionDescription); + authenticator.authenticate(internalConnection, connectionDescription, OPERATION_CONTEXT); } private static MongoCredentialWithCache getCredentialWithCache(final String userName, final String source, final char[] password) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandHelperTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandHelperTest.java new file mode 100644 index 00000000000..3803bf45690 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandHelperTest.java @@ -0,0 +1,117 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.connection; + +import com.mongodb.MongoCommandException; +import com.mongodb.ServerAddress; +import com.mongodb.ServerApi; +import com.mongodb.ServerApiVersion; +import com.mongodb.connection.ClusterId; +import com.mongodb.connection.ConnectionDescription; +import com.mongodb.connection.ServerId; +import com.mongodb.internal.IgnorableRequestContext; +import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.TimeoutSettings; +import org.bson.BsonDocument; +import org.bson.codecs.Decoder; +import org.junit.jupiter.api.Test; + +import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.connection.ClusterConnectionMode.SINGLE; +import static com.mongodb.internal.connection.CommandHelper.executeCommand; +import static com.mongodb.internal.connection.CommandHelper.executeCommandAsync; +import static com.mongodb.internal.connection.CommandHelper.executeCommandWithoutCheckingForFailure; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CommandHelperTest { + + static final BsonDocument COMMAND = BsonDocument.parse("{ping: 1}"); + static final BsonDocument OK = BsonDocument.parse("{ok: 1}"); + static final BsonDocument NOT_OK = BsonDocument.parse("{ok: 0, errmsg: 'error'}"); + + static final ConnectionDescription CONNECTION_DESCRIPTION = new ConnectionDescription( + new ServerId(new ClusterId("cluster"), new ServerAddress())); + + @Test + void testExecuteCommand() { + InternalConnection internalConnection = mock(InternalConnection.class); + OperationContext operationContext = createOperationContext(); + + + when(internalConnection.getDescription()).thenReturn(CONNECTION_DESCRIPTION); + when(internalConnection.sendAndReceive(any(), any(), any())).thenReturn(OK); + + assertEquals(OK, + executeCommand("admin", COMMAND, SINGLE, operationContext.getServerApi(), internalConnection, operationContext)); + + verify(internalConnection).sendAndReceive(any(CommandMessage.class), any(Decoder.class), eq(operationContext)); + } + + @Test + void testExecuteCommandWithoutCheckingForFailure() { + InternalConnection internalConnection = mock(InternalConnection.class); + OperationContext operationContext = createOperationContext(); + + when(internalConnection.getDescription()).thenReturn(CONNECTION_DESCRIPTION); + when(internalConnection.sendAndReceive(any(), any(), any())) + .thenThrow(new MongoCommandException(NOT_OK, new ServerAddress())); + + assertEquals(new BsonDocument(), + executeCommandWithoutCheckingForFailure("admin", COMMAND, SINGLE, operationContext.getServerApi(), + internalConnection, operationContext)); + + verify(internalConnection).sendAndReceive(any(CommandMessage.class), any(Decoder.class), eq(operationContext)); + } + + + @Test + void testExecuteCommandAsyncUsesTheOperationContext() { + InternalConnection internalConnection = mock(InternalConnection.class); + OperationContext operationContext = createOperationContext(); + + + when(internalConnection.getDescription()).thenReturn(CONNECTION_DESCRIPTION); + when(internalConnection.sendAndReceive(any(), any(), any())).thenReturn(OK); + + executeCommandAsync("admin", COMMAND, SINGLE, operationContext.getServerApi(), internalConnection, operationContext, + (r, t) -> {}); + + verify(internalConnection).sendAndReceiveAsync(any(CommandMessage.class), any(Decoder.class), eq(operationContext), any()); + } + + @Test + void testIsCommandOk() { + assertTrue(CommandHelper.isCommandOk(OK)); + assertTrue(CommandHelper.isCommandOk(BsonDocument.parse("{ok: true}"))); + assertFalse(CommandHelper.isCommandOk(NOT_OK)); + assertFalse(CommandHelper.isCommandOk(BsonDocument.parse("{ok: false}"))); + assertFalse(CommandHelper.isCommandOk(BsonDocument.parse("{ok: 11}"))); + assertFalse(CommandHelper.isCommandOk(BsonDocument.parse("{ok: 'nope'}"))); + assertFalse(CommandHelper.isCommandOk(new BsonDocument())); + } + + + OperationContext createOperationContext() { + return new OperationContext(IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, + new TimeoutContext(TimeoutSettings.DEFAULT), ServerApi.builder().version(ServerApiVersion.V1).build()); + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy index c389e647be1..93bc656226a 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy @@ -27,6 +27,7 @@ import com.mongodb.connection.ServerConnectionState import com.mongodb.connection.ServerDescription import com.mongodb.connection.ServerId import com.mongodb.connection.ServerType +import com.mongodb.internal.TimeoutSettings import org.bson.BsonArray import org.bson.BsonBoolean import org.bson.BsonDocument @@ -47,11 +48,13 @@ import static com.mongodb.internal.connection.ClientMetadataHelperProseTest.crea import static com.mongodb.internal.connection.MessageHelper.LEGACY_HELLO import static com.mongodb.internal.connection.MessageHelper.buildSuccessfulReply import static com.mongodb.internal.connection.MessageHelper.decodeCommand +import static com.mongodb.internal.connection.OperationContext.simpleOperationContext class InternalStreamConnectionInitializerSpecification extends Specification { def serverId = new ServerId(new ClusterId(), new ServerAddress()) def internalConnection = new TestInternalConnection(serverId, ServerType.STANDALONE) + def operationContext = simpleOperationContext(TimeoutSettings.DEFAULT, null) def 'should create correct description'() { given: @@ -59,8 +62,8 @@ class InternalStreamConnectionInitializerSpecification extends Specification { when: enqueueSuccessfulReplies(false, null) - def description = initializer.startHandshake(internalConnection) - description = initializer.finishHandshake(internalConnection, description) + def description = initializer.startHandshake(internalConnection, operationContext) + description = initializer.finishHandshake(internalConnection, description, operationContext) def connectionDescription = description.connectionDescription def serverDescription = description.serverDescription @@ -76,10 +79,10 @@ class InternalStreamConnectionInitializerSpecification extends Specification { when: enqueueSuccessfulReplies(false, null) def futureCallback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, futureCallback) + initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) def description = futureCallback.get() futureCallback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, futureCallback) + initializer.finishHandshakeAsync(internalConnection, description, operationContext, futureCallback) description = futureCallback.get() def connectionDescription = description.connectionDescription def serverDescription = description.serverDescription @@ -95,8 +98,9 @@ class InternalStreamConnectionInitializerSpecification extends Specification { when: enqueueSuccessfulReplies(false, 123) - def internalDescription = initializer.startHandshake(internalConnection) - def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription).connectionDescription + def internalDescription = initializer.startHandshake(internalConnection, operationContext) + def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription, operationContext) + .connectionDescription then: connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, 123) @@ -108,8 +112,9 @@ class InternalStreamConnectionInitializerSpecification extends Specification { when: enqueueSuccessfulRepliesWithConnectionIdIsHelloResponse(false, 123) - def internalDescription = initializer.startHandshake(internalConnection) - def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription).connectionDescription + def internalDescription = initializer.startHandshake(internalConnection, operationContext) + def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription, operationContext) + .connectionDescription then: connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, 123) @@ -122,10 +127,10 @@ class InternalStreamConnectionInitializerSpecification extends Specification { when: enqueueSuccessfulReplies(false, 123) def futureCallback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, futureCallback) + initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) def description = futureCallback.get() futureCallback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, futureCallback) + initializer.finishHandshakeAsync(internalConnection, description, operationContext, futureCallback) def connectionDescription = futureCallback.get().connectionDescription then: @@ -139,10 +144,10 @@ class InternalStreamConnectionInitializerSpecification extends Specification { when: enqueueSuccessfulRepliesWithConnectionIdIsHelloResponse(false, 123) def futureCallback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, futureCallback) + initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) def description = futureCallback.get() futureCallback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, futureCallback) + initializer.finishHandshakeAsync(internalConnection, description, operationContext, futureCallback) description = futureCallback.get() def connectionDescription = description.connectionDescription @@ -158,12 +163,13 @@ class InternalStreamConnectionInitializerSpecification extends Specification { when: enqueueSuccessfulReplies(false, null) - def internalDescription = initializer.startHandshake(internalConnection) - def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription).connectionDescription + def internalDescription = initializer.startHandshake(internalConnection, operationContext) + def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription, operationContext) + .connectionDescription then: connectionDescription - 1 * firstAuthenticator.authenticate(internalConnection, _) + 1 * firstAuthenticator.authenticate(internalConnection, _, _) } def 'should authenticate asynchronously'() { @@ -175,15 +181,15 @@ class InternalStreamConnectionInitializerSpecification extends Specification { enqueueSuccessfulReplies(false, null) def futureCallback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, futureCallback) + initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) def description = futureCallback.get() futureCallback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, futureCallback) + initializer.finishHandshakeAsync(internalConnection, description, operationContext, futureCallback) def connectionDescription = futureCallback.get().connectionDescription then: connectionDescription - 1 * authenticator.authenticateAsync(internalConnection, _, _) >> { it[2].onResult(null, null) } + 1 * authenticator.authenticateAsync(internalConnection, _, _, _) >> { it[3].onResult(null, null) } } def 'should not authenticate if server is an arbiter'() { @@ -194,12 +200,13 @@ class InternalStreamConnectionInitializerSpecification extends Specification { when: enqueueSuccessfulReplies(true, null) - def internalDescription = initializer.startHandshake(internalConnection) - def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription).connectionDescription + def internalDescription = initializer.startHandshake(internalConnection, operationContext) + def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription, operationContext) + .connectionDescription then: connectionDescription - 0 * authenticator.authenticate(internalConnection, _) + 0 * authenticator.authenticate(internalConnection, _, _) } def 'should not authenticate asynchronously if server is an arbiter asynchronously'() { @@ -211,10 +218,10 @@ class InternalStreamConnectionInitializerSpecification extends Specification { enqueueSuccessfulReplies(true, null) def futureCallback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, futureCallback) + initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) def description = futureCallback.get() futureCallback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, futureCallback) + initializer.finishHandshakeAsync(internalConnection, description, operationContext, futureCallback) def connectionDescription = futureCallback.get().connectionDescription then: @@ -236,14 +243,14 @@ class InternalStreamConnectionInitializerSpecification extends Specification { enqueueSuccessfulReplies(false, null) if (async) { def callback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, callback) + initializer.startHandshakeAsync(internalConnection, operationContext, callback) def description = callback.get() callback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, callback) + initializer.finishHandshakeAsync(internalConnection, description, operationContext, callback) callback.get() } else { - def internalDescription = initializer.startHandshake(internalConnection) - initializer.finishHandshake(internalConnection, internalDescription) + def internalDescription = initializer.startHandshake(internalConnection, operationContext) + initializer.finishHandshake(internalConnection, internalDescription, operationContext) } then: @@ -273,14 +280,14 @@ class InternalStreamConnectionInitializerSpecification extends Specification { enqueueSuccessfulReplies(false, null) if (async) { def callback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, callback) + initializer.startHandshakeAsync(internalConnection, operationContext, callback) def description = callback.get() callback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, callback) + initializer.finishHandshakeAsync(internalConnection, description, operationContext, callback) callback.get() } else { - def internalDescription = initializer.startHandshake(internalConnection) - initializer.finishHandshake(internalConnection, internalDescription) + def internalDescription = initializer.startHandshake(internalConnection, operationContext) + initializer.finishHandshake(internalConnection, internalDescription, operationContext) } then: @@ -312,9 +319,9 @@ class InternalStreamConnectionInitializerSpecification extends Specification { then: description if (async) { - 1 * scramShaAuthenticator.authenticateAsync(internalConnection, _, _) + 1 * scramShaAuthenticator.authenticateAsync(internalConnection, _, _, _) } else { - 1 * scramShaAuthenticator.authenticate(internalConnection, _) + 1 * scramShaAuthenticator.authenticate(internalConnection, _, _) } 1 * ((SpeculativeAuthenticator) scramShaAuthenticator).createSpeculativeAuthenticateCommand(_) ((SpeculativeAuthenticator) scramShaAuthenticator).getSpeculativeAuthenticateResponse() == speculativeAuthenticateResponse @@ -343,9 +350,9 @@ class InternalStreamConnectionInitializerSpecification extends Specification { then: description if (async) { - 1 * authenticator.authenticateAsync(internalConnection, _, _) + 1 * authenticator.authenticateAsync(internalConnection, _, _, _) } else { - 1 * authenticator.authenticate(internalConnection, _) + 1 * authenticator.authenticate(internalConnection, _, _) } 1 * ((SpeculativeAuthenticator) authenticator).createSpeculativeAuthenticateCommand(_) ((SpeculativeAuthenticator) authenticator).getSpeculativeAuthenticateResponse() == speculativeAuthenticateResponse @@ -374,9 +381,9 @@ class InternalStreamConnectionInitializerSpecification extends Specification { then: description if (async) { - 1 * authenticator.authenticateAsync(internalConnection, _, _) + 1 * authenticator.authenticateAsync(internalConnection, _, _, _) } else { - 1 * authenticator.authenticate(internalConnection, _) + 1 * authenticator.authenticate(internalConnection, _, _) } 1 * ((SpeculativeAuthenticator) authenticator).createSpeculativeAuthenticateCommand(_) ((SpeculativeAuthenticator) authenticator).getSpeculativeAuthenticateResponse() == speculativeAuthenticateResponse @@ -402,9 +409,9 @@ class InternalStreamConnectionInitializerSpecification extends Specification { then: description if (async) { - 1 * authenticator.authenticateAsync(internalConnection, _, _) + 1 * authenticator.authenticateAsync(internalConnection, _, _, _) } else { - 1 * authenticator.authenticate(internalConnection, _) + 1 * authenticator.authenticate(internalConnection, _, _) } 1 * ((SpeculativeAuthenticator) authenticator).createSpeculativeAuthenticateCommand(_) ((SpeculativeAuthenticator) authenticator).getSpeculativeAuthenticateResponse() == speculativeAuthenticateResponse @@ -444,14 +451,14 @@ class InternalStreamConnectionInitializerSpecification extends Specification { final TestInternalConnection connection) { if (async) { def callback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, callback) + initializer.startHandshakeAsync(internalConnection, operationContext, callback) def description = callback.get() callback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, callback) + initializer.finishHandshakeAsync(internalConnection, description, operationContext, callback) callback.get() } else { - def internalDescription = initializer.startHandshake(connection) - initializer.finishHandshake(connection, internalDescription) + def internalDescription = initializer.startHandshake(connection, operationContext) + initializer.finishHandshake(connection, internalDescription, operationContext) } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy index 65ff27e018b..927d32cfef1 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy @@ -98,10 +98,10 @@ class InternalStreamConnectionSpecification extends Specification { create(_) >> { stream } } def initializer = Mock(InternalConnectionInitializer) { - startHandshake(_) >> { internalConnectionInitializationDescription } - finishHandshake(_, _) >> { internalConnectionInitializationDescription } - startHandshakeAsync(_, _) >> { it[1].onResult(internalConnectionInitializationDescription, null) } - finishHandshakeAsync(_, _, _) >> { it[2].onResult(internalConnectionInitializationDescription, null) } + startHandshake(_, _) >> { internalConnectionInitializationDescription } + finishHandshake(_, _, _) >> { internalConnectionInitializationDescription } + startHandshakeAsync(_, _, _) >> { it[2].onResult(internalConnectionInitializationDescription, null) } + finishHandshakeAsync(_, _, _, _) >> { it[3].onResult(internalConnectionInitializationDescription, null) } } def getConnection() { @@ -168,7 +168,7 @@ class InternalStreamConnectionSpecification extends Specification { def 'should close the stream when initialization throws an exception'() { given: def failedInitializer = Mock(InternalConnectionInitializer) { - startHandshake(_) >> { throw new MongoInternalException('Something went wrong') } + startHandshake(_, _) >> { throw new MongoInternalException('Something went wrong') } } def connection = new InternalStreamConnection(SINGLE, SERVER_ID, new TestConnectionGenerationSupplier(), streamFactory, [], null, failedInitializer) @@ -185,7 +185,7 @@ class InternalStreamConnectionSpecification extends Specification { def 'should close the stream when initialization throws an exception asynchronously'() { given: def failedInitializer = Mock(InternalConnectionInitializer) { - startHandshakeAsync(_, _) >> { it[1].onResult(null, new MongoInternalException('Something went wrong')) } + startHandshakeAsync(_, _, _) >> { it[2].onResult(null, new MongoInternalException('Something went wrong')) } } def connection = new InternalStreamConnection(SINGLE, SERVER_ID, new TestConnectionGenerationSupplier(), streamFactory, [], null, failedInitializer) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/PlainAuthenticatorUnitTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/PlainAuthenticatorUnitTest.java index e4a4f80289c..12d8e9fa7c3 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/PlainAuthenticatorUnitTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/PlainAuthenticatorUnitTest.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT; import static com.mongodb.ClusterFixture.getServerApi; import static com.mongodb.internal.connection.MessageHelper.getApiVersionField; import static com.mongodb.internal.connection.MessageHelper.getDbField; @@ -53,7 +54,7 @@ public void before() { public void testSuccessfulAuthentication() { enqueueSuccessfulReply(); - subject.authenticate(connection, connectionDescription); + subject.authenticate(connection, connectionDescription, OPERATION_CONTEXT); validateMessages(); } @@ -63,7 +64,7 @@ public void testSuccessfulAuthenticationAsync() throws ExecutionException, Inter enqueueSuccessfulReply(); FutureResultCallback futureCallback = new FutureResultCallback<>(); - subject.authenticateAsync(connection, connectionDescription, futureCallback); + subject.authenticateAsync(connection, connectionDescription, OPERATION_CONTEXT, futureCallback); futureCallback.get(); validateMessages(); diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ScramShaAuthenticatorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/ScramShaAuthenticatorSpecification.groovy index 32295d12b7c..21f9bc28161 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/ScramShaAuthenticatorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ScramShaAuthenticatorSpecification.groovy @@ -23,6 +23,7 @@ import com.mongodb.connection.ClusterId import com.mongodb.connection.ConnectionDescription import com.mongodb.connection.ServerId import com.mongodb.connection.ServerType +import com.mongodb.internal.TimeoutSettings import org.bson.BsonDocument import spock.lang.Specification @@ -34,11 +35,13 @@ import static com.mongodb.MongoCredential.createScramSha1Credential import static com.mongodb.MongoCredential.createScramSha256Credential import static com.mongodb.connection.ClusterConnectionMode.SINGLE import static com.mongodb.internal.connection.MessageHelper.buildSuccessfulReply +import static com.mongodb.internal.connection.OperationContext.simpleOperationContext import static org.junit.Assert.assertEquals class ScramShaAuthenticatorSpecification extends Specification { def serverId = new ServerId(new ClusterId(), new ServerAddress('localhost', 27017)) def connectionDescription = new ConnectionDescription(serverId) + def operationContext = simpleOperationContext(TimeoutSettings.DEFAULT, null) private final static MongoCredentialWithCache SHA1_CREDENTIAL = new MongoCredentialWithCache(createScramSha1Credential('user', 'database', 'pencil' as char[])) private final static MongoCredentialWithCache SHA256_CREDENTIAL = @@ -522,10 +525,10 @@ class ScramShaAuthenticatorSpecification extends Specification { def authenticate(TestInternalConnection connection, ScramShaAuthenticator authenticator, boolean async) { if (async) { FutureResultCallback futureCallback = new FutureResultCallback() - authenticator.authenticateAsync(connection, connectionDescription, futureCallback) + authenticator.authenticateAsync(connection, connectionDescription, operationContext, futureCallback) futureCallback.get(5, TimeUnit.SECONDS) } else { - authenticator.authenticate(connection, connectionDescription) + authenticator.authenticate(connection, connectionDescription, operationContext) } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPoolListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPoolListener.java index 9d8eda976d6..7ec08e2f3e7 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPoolListener.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPoolListener.java @@ -28,6 +28,8 @@ import com.mongodb.event.ConnectionPoolListener; import com.mongodb.event.ConnectionPoolReadyEvent; import com.mongodb.event.ConnectionReadyEvent; +import com.mongodb.internal.time.StartTime; +import com.mongodb.internal.time.Timeout; import java.util.ArrayList; import java.util.Arrays; @@ -84,6 +86,22 @@ public int countEvents(final Class eventClass) { return eventCount; } + public void waitForEvents(final List> eventClasses, final long time, final TimeUnit unit) + throws InterruptedException, TimeoutException { + Timeout timeout = StartTime.now().timeoutAfterOrInfiniteIfNegative(time, unit); + ArrayList seen = new ArrayList<>(); + + for (Class eventClass : eventClasses) { + waitForEvent(eventClass, 1, timeout.remaining(unit), unit); + + if (timeout.hasExpired()) { + throw new TimeoutException("Timed out waiting for event of type " + eventClass + + ". Timing out after seeing " + seen); + } + seen.add(eventClass); + } + } + public void waitForEvent(final Class eventClass, final int count, final long time, final TimeUnit unit) throws InterruptedException, TimeoutException { lock.lock(); @@ -106,6 +124,7 @@ public void waitForEvent(final Class eventClass, final int count, final l } } + private boolean containsEvent(final Class eventClass, final int expectedEventCount) { return countEvents(eventClass) >= expectedEventCount; } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/X509AuthenticatorNoUserNameTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/X509AuthenticatorNoUserNameTest.java index cf829f919c5..4a56c908f52 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/X509AuthenticatorNoUserNameTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/X509AuthenticatorNoUserNameTest.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT; import static com.mongodb.ClusterFixture.getServerApi; import static com.mongodb.connection.ClusterConnectionMode.MULTIPLE; import static com.mongodb.internal.connection.MessageHelper.buildSuccessfulReply; @@ -56,7 +57,8 @@ public void before() { public void testSuccessfulAuthentication() { enqueueSuccessfulAuthenticationReply(); - new X509Authenticator(getCredentialWithCache(), MULTIPLE, getServerApi()).authenticate(connection, connectionDescriptionThreeSix); + new X509Authenticator(getCredentialWithCache(), MULTIPLE, getServerApi()) + .authenticate(connection, connectionDescriptionThreeSix, OPERATION_CONTEXT); validateMessages(); } @@ -67,7 +69,7 @@ public void testSuccessfulAuthenticationAsync() throws ExecutionException, Inter FutureResultCallback futureCallback = new FutureResultCallback<>(); new X509Authenticator(getCredentialWithCache(), MULTIPLE, getServerApi()).authenticateAsync(connection, - connectionDescriptionThreeSix, futureCallback); + connectionDescriptionThreeSix, OPERATION_CONTEXT, futureCallback); futureCallback.get(); diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/X509AuthenticatorUnitTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/X509AuthenticatorUnitTest.java index 92ff72fde83..a8b2d7b71d5 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/X509AuthenticatorUnitTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/X509AuthenticatorUnitTest.java @@ -30,8 +30,8 @@ import org.junit.Test; import java.util.List; -import java.util.concurrent.ExecutionException; +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT; import static com.mongodb.ClusterFixture.getServerApi; import static com.mongodb.internal.connection.MessageHelper.buildSuccessfulReply; import static com.mongodb.internal.connection.MessageHelper.getApiVersionField; @@ -58,7 +58,7 @@ public void testFailedAuthentication() { enqueueFailedAuthenticationReply(); try { - subject.authenticate(connection, connectionDescription); + subject.authenticate(connection, connectionDescription, OPERATION_CONTEXT); fail(); } catch (MongoSecurityException e) { // all good @@ -70,7 +70,7 @@ public void testFailedAuthenticationAsync() { enqueueFailedAuthenticationReply(); FutureResultCallback futureCallback = new FutureResultCallback<>(); - subject.authenticateAsync(connection, connectionDescription, futureCallback); + subject.authenticateAsync(connection, connectionDescription, OPERATION_CONTEXT, futureCallback); try { futureCallback.get(); @@ -92,17 +92,17 @@ private void enqueueFailedAuthenticationReply() { public void testSuccessfulAuthentication() { enqueueSuccessfulAuthenticationReply(); - subject.authenticate(connection, connectionDescription); + subject.authenticate(connection, connectionDescription, OPERATION_CONTEXT); validateMessages(); } @Test - public void testSuccessfulAuthenticationAsync() throws ExecutionException, InterruptedException { + public void testSuccessfulAuthenticationAsync() { enqueueSuccessfulAuthenticationReply(); FutureResultCallback futureCallback = new FutureResultCallback<>(); - subject.authenticateAsync(connection, connectionDescription, futureCallback); + subject.authenticateAsync(connection, connectionDescription, OPERATION_CONTEXT, futureCallback); futureCallback.get(); @@ -117,7 +117,7 @@ public void testSpeculativeAuthentication() { + "user: \"CN=client,OU=kerneluser,O=10Gen,L=New York City,ST=New York,C=US\", " + "mechanism: \"MONGODB-X509\", db: \"$external\"}"); subject.setSpeculativeAuthenticateResponse(BsonDocument.parse(speculativeAuthenticateResponse)); - subject.authenticate(connection, connectionDescription); + subject.authenticate(connection, connectionDescription, OPERATION_CONTEXT); assertEquals(connection.getSent().size(), 0); assertEquals(expectedSpeculativeAuthenticateCommand, subject.createSpeculativeAuthenticateCommand(connection)); diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java index d159a07c8e4..c18b8ee0230 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java @@ -25,20 +25,12 @@ import java.net.URISyntaxException; import java.util.Collection; -import static org.junit.Assume.assumeFalse; - public class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedReactiveStreamsTest { public UnifiedServerDiscoveryAndMonitoringTest(@SuppressWarnings("unused") final String fileDescription, final String testDescription, final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities, final BsonArray initialData, final BsonDocument definition) { super(schemaVersion, runOnRequirements, entities, initialData, definition); - - assumeFalse("TODO (CSOT) - JAVA-5211 - apply settings to the operation context", - (fileDescription.equals("hello-timeout") && testDescription.startsWith("Network timeout on Monitor")) - || (fileDescription.equals("find-network-timeout-error") - && testDescription.startsWith("Ignore network timeout error on find")) - || fileDescription.equals("auth-network-timeout-error")); } @Parameterized.Parameters(name = "{0}: {1}") diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 70aaa7df53d..47419aded4c 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -20,6 +20,7 @@ import com.mongodb.ConnectionString; import com.mongodb.CursorType; import com.mongodb.MongoClientSettings; +import com.mongodb.MongoCredential; import com.mongodb.MongoNamespace; import com.mongodb.MongoOperationTimeoutException; import com.mongodb.MongoTimeoutException; @@ -35,8 +36,12 @@ import com.mongodb.client.test.CollectionHelper; import com.mongodb.event.CommandStartedEvent; import com.mongodb.event.CommandSucceededEvent; +import com.mongodb.event.ConnectionClosedEvent; +import com.mongodb.event.ConnectionCreatedEvent; +import com.mongodb.event.ConnectionReadyEvent; import com.mongodb.internal.connection.ServerHelper; import com.mongodb.internal.connection.TestCommandListener; +import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.test.FlakyTest; import org.bson.BsonDocument; import org.bson.BsonInt32; @@ -62,16 +67,20 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static com.mongodb.ClusterFixture.getConnectionString; +import static com.mongodb.ClusterFixture.isAuthenticated; import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; import static com.mongodb.ClusterFixture.isServerlessTest; import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static com.mongodb.ClusterFixture.sleep; import static com.mongodb.client.Fixture.getDefaultDatabaseName; import static com.mongodb.client.Fixture.getPrimary; +import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeFalse; @@ -104,6 +113,168 @@ public abstract class AbstractClientSideOperationsTimeoutProseTest { protected abstract boolean isAsync(); @Tag("setsFailPoint") + @SuppressWarnings("try") + @FlakyTest(maxAttempts = 3) + @DisplayName("4. Background Connection Pooling - timeoutMS used for handshake commands") + public void testBackgroundConnectionPoolingTimeoutMSUsedForHandshakeCommands() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isAuthenticated()); + assumeFalse(isServerlessTest()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: {" + + " times: 1" + + " }," + + " data: {" + + " failCommands: [\"saslContinue\"]," + + " blockConnection: true," + + " blockTimeMS: 150," + + " appName: \"timeoutBackgroundPoolTest\"" + + " }" + + "}"); + + TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener(); + + try (MongoClient ignoredClient = createMongoClient(getMongoClientSettingsBuilder() + .applicationName("timeoutBackgroundPoolTest") + .applyToConnectionPoolSettings(builder -> { + builder.minSize(1); + builder.addConnectionPoolListener(connectionPoolListener); + }) + .timeout(100, TimeUnit.MILLISECONDS))) { + + assertDoesNotThrow(() -> + connectionPoolListener.waitForEvents(asList(ConnectionCreatedEvent.class, ConnectionClosedEvent.class), + 10, TimeUnit.SECONDS)); + } + } + + @Tag("setsFailPoint") + @SuppressWarnings("try") + @FlakyTest(maxAttempts = 3) + @DisplayName("4. Background Connection Pooling - timeoutMS is refreshed for each handshake command") + public void testBackgroundConnectionPoolingTimeoutMSIsRefreshedForEachHandshakeCommand() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isAuthenticated()); + assumeFalse(isServerlessTest()); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: \"alwaysOn\"," + + " data: {" + + " failCommands: [\"hello\", \"isMaster\", \"saslContinue\"]," + + " blockConnection: true," + + " blockTimeMS: 150," + + " appName: \"refreshTimeoutBackgroundPoolTest\"" + + " }" + + "}"); + + TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener(); + + try (MongoClient ignoredClient = createMongoClient(getMongoClientSettingsBuilder() + .applicationName("refreshTimeoutBackgroundPoolTest") + .applyToConnectionPoolSettings(builder -> { + builder.minSize(1); + builder.addConnectionPoolListener(connectionPoolListener); + }) + .timeout(250, TimeUnit.MILLISECONDS))) { + + assertDoesNotThrow(() -> + connectionPoolListener.waitForEvents(asList(ConnectionCreatedEvent.class, ConnectionReadyEvent.class), + 10, TimeUnit.SECONDS)); + } + } + + @Tag("setsFailPoint") + @FlakyTest(maxAttempts = 3) + @DisplayName("5. Blocking Iteration Methods - Tailable cursors") + public void testBlockingIterationMethodsTailableCursor() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeFalse(isServerlessTest()); + + collectionHelper.create(namespace.getCollectionName(), + new CreateCollectionOptions().capped(true).sizeInBytes(10 * 1024 * 1024)); + collectionHelper.insertDocuments(singletonList(BsonDocument.parse("{x: 1}")), WriteConcern.MAJORITY); + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: \"alwaysOn\"," + + " data: {" + + " failCommands: [\"getMore\"]," + + " blockConnection: true," + + " blockTimeMS: " + 150 + + " }" + + "}"); + + try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() + .timeout(250, TimeUnit.MILLISECONDS))) { + MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + try (MongoCursor cursor = collection.find().cursorType(CursorType.Tailable).cursor()) { + Document document = assertDoesNotThrow(cursor::next); + assertEquals(1, document.get("x")); + assertThrows(MongoOperationTimeoutException.class, cursor::next); + } + + List events = commandListener.getCommandSucceededEvents(); + assertEquals(1, events.stream().filter(e -> e.getCommandName().equals("find")).count()); + long getMoreCount = events.stream().filter(e -> e.getCommandName().equals("getMore")).count(); + assertTrue(getMoreCount <= 2, "getMoreCount expected to less than or equal to two but was: " + getMoreCount); + } + } + + @Tag("setsFailPoint") + @FlakyTest(maxAttempts = 3) + @DisplayName("5. Blocking Iteration Methods - Change Streams") + public void testBlockingIterationMethodsChangeStream() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeTrue(isDiscoverableReplicaSet()); + assumeFalse(isServerlessTest()); + assumeFalse(isAsync()); // Async change stream cursor is non-deterministic for cursor::next + + BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); + collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); + sleep(2000); + collectionHelper.insertDocuments(singletonList(BsonDocument.parse("{x: 1}")), WriteConcern.MAJORITY); + + collectionHelper.runAdminCommand("{" + + " configureFailPoint: \"failCommand\"," + + " mode: \"alwaysOn\"," + + " data: {" + + " failCommands: [\"getMore\"]," + + " blockConnection: true," + + " blockTimeMS: " + 150 + + " }" + + "}"); + + try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() + .timeout(250, TimeUnit.MILLISECONDS))) { + + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); + try (MongoChangeStreamCursor> cursor = collection.watch( + singletonList(Document.parse("{ '$match': {'operationType': 'insert'}}"))) + .startAtOperationTime(startTime) + .fullDocument(FullDocument.UPDATE_LOOKUP) + .cursor()) { + ChangeStreamDocument document = assertDoesNotThrow(cursor::next); + + Document fullDocument = document.getFullDocument(); + assertNotNull(fullDocument); + assertEquals(1, fullDocument.get("x")); + assertThrows(MongoOperationTimeoutException.class, cursor::next); + } + List events = commandListener.getCommandSucceededEvents(); + assertEquals(1, events.stream().filter(e -> e.getCommandName().equals("aggregate")).count()); + long getMoreCount = events.stream().filter(e -> e.getCommandName().equals("getMore")).count(); + assertTrue(getMoreCount <= 2, "getMoreCount expected to less than or equal to two but was: " + getMoreCount); + } + } + + + @Tag("setsFailPoint") + @FlakyTest(maxAttempts = 3) @DisplayName("6. GridFS Upload - uploads via openUploadStream can be timed out") @Test public void testGridFSUploadViaOpenUploadStreamTimeout() { @@ -222,115 +393,70 @@ public void testGridFsDownloadStreamTimeout() { } } - @Tag("setsFailPoint") - @FlakyTest(maxAttempts = 3) - @DisplayName("5. Blocking Iteration Methods - Tailable cursors") - public void testBlockingIterationMethodsTailableCursor() { - assumeTrue(serverVersionAtLeast(4, 4)); + @DisplayName("8. Server Selection 1 / 2") + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("test8ServerSelectionArguments") + public void test8ServerSelection(final String connectionString) { assumeFalse(isServerlessTest()); - - collectionHelper.create(namespace.getCollectionName(), - new CreateCollectionOptions().capped(true).sizeInBytes(10 * 1024 * 1024)); - collectionHelper.insertDocuments(singletonList(BsonDocument.parse("{x: 1}")), WriteConcern.MAJORITY); - collectionHelper.runAdminCommand("{" - + " configureFailPoint: \"failCommand\"," - + " mode: \"alwaysOn\"," - + " data: {" - + " failCommands: [\"getMore\"]," - + " blockConnection: true," - + " blockTimeMS: " + 150 - + " }" - + "}"); - - try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder() - .timeout(250, TimeUnit.MILLISECONDS))) { - MongoCollection collection = client.getDatabase(namespace.getDatabaseName()) - .getCollection(namespace.getCollectionName()); - - try (MongoCursor cursor = collection.find().cursorType(CursorType.Tailable).cursor()) { - Document document = assertDoesNotThrow(cursor::next); - assertEquals(1, document.get("x")); - assertThrows(MongoOperationTimeoutException.class, cursor::next); - } - - List events = commandListener.getCommandSucceededEvents(); - assertEquals(1, events.stream().filter(e -> e.getCommandName().equals("find")).count()); - long getMoreCount = events.stream().filter(e -> e.getCommandName().equals("getMore")).count(); - assertTrue(getMoreCount <= 2, "getMoreCount expected to less than or equal to two but was: " + getMoreCount); + int timeoutBuffer = 100; // 5 in spec, Java is slower + // 1. Create a MongoClient + try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() + .applyConnectionString(new ConnectionString(connectionString))) + ) { + long start = System.nanoTime(); + // 2. Using client, execute: + Throwable throwable = assertThrows(MongoTimeoutException.class, () -> { + mongoClient.getDatabase("admin").runCommand(new BsonDocument("ping", new BsonInt32(1))); + }); + // Expect this to fail with a server selection timeout error after no more than 15ms [this is increased] + long elapsed = msElapsedSince(start); + assertTrue(throwable.getMessage().contains("while waiting for a server")); + assertTrue(elapsed < 10 + timeoutBuffer, "Took too long to time out, elapsedMS: " + elapsed); } } @Tag("setsFailPoint") - @FlakyTest(maxAttempts = 3) - @DisplayName("5. Blocking Iteration Methods - Change Streams") - public void testBlockingIterationMethodsChangeStream() { + @DisplayName("8. Server Selection 2 / 2") + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("test8ServerSelectionHandshakeArguments") + public void test8ServerSelectionHandshake(final String ignoredTestName, final int timeoutMS, final int serverSelectionTimeoutMS) { assumeTrue(serverVersionAtLeast(4, 4)); - assumeTrue(isDiscoverableReplicaSet()); + assumeTrue(isAuthenticated()); assumeFalse(isServerlessTest()); - assumeFalse(isAsync()); // Async change stream cursor is non-deterministic for cursor::next - BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0); - collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions()); - sleep(2000); - collectionHelper.insertDocuments(singletonList(BsonDocument.parse("{x: 1}")), WriteConcern.MAJORITY); + MongoCredential credential = getConnectionString().getCredential(); + assertNotNull(credential); + assertNull(credential.getAuthenticationMechanism()); + MongoNamespace namespace = generateNamespace(); + collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), namespace); collectionHelper.runAdminCommand("{" + " configureFailPoint: \"failCommand\"," - + " mode: \"alwaysOn\"," + + " mode: { times: 1 }," + " data: {" - + " failCommands: [\"getMore\"]," + + " failCommands: [\"saslContinue\"]," + " blockConnection: true," - + " blockTimeMS: " + 150 + + " blockTimeMS: 150" + " }" + "}"); try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() - .timeout(250, TimeUnit.MILLISECONDS))) { - - MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) - .getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary()); - try (MongoChangeStreamCursor> cursor = collection.watch( - singletonList(Document.parse("{ '$match': {'operationType': 'insert'}}"))) - .startAtOperationTime(startTime) - .fullDocument(FullDocument.UPDATE_LOOKUP) - .cursor()) { - ChangeStreamDocument document = assertDoesNotThrow(cursor::next); - - Document fullDocument = document.getFullDocument(); - assertNotNull(fullDocument); - assertEquals(1, fullDocument.get("x")); - assertThrows(MongoOperationTimeoutException.class, cursor::next); - } - List events = commandListener.getCommandSucceededEvents(); - assertEquals(1, events.stream().filter(e -> e.getCommandName().equals("aggregate")).count()); - long getMoreCount = events.stream().filter(e -> e.getCommandName().equals("getMore")).count(); - assertTrue(getMoreCount <= 2, "getMoreCount expected to less than or equal to two but was: " + getMoreCount); - } - } + .timeout(timeoutMS, TimeUnit.MILLISECONDS) + .applyToClusterSettings(b -> b.serverSelectionTimeout(serverSelectionTimeoutMS, TimeUnit.MILLISECONDS)) + .retryWrites(false))) { - @DisplayName("8. Server Selection") - @ParameterizedTest(name = "[{index}] {0}") - @MethodSource("test8ServerSelectionArguments") - public void test8ServerSelection(final String connectionString) { - assumeFalse(isServerlessTest()); - int timeoutBuffer = 100; // 5 in spec, Java is slower - // 1. Create a MongoClient - try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder() - .applyConnectionString(new ConnectionString(connectionString))) - ) { long start = System.nanoTime(); - // 2. Using client, execute: - Throwable throwable = assertThrows(MongoTimeoutException.class, () -> { - mongoClient.getDatabase("admin").runCommand(new BsonDocument("ping", new BsonInt32(1))); + assertThrows(MongoTimeoutException.class, () -> { + mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()) + .insertOne(new Document("x", 1)); }); - // Expect this to fail with a server selection timeout error after no more than 15ms [this is increased] long elapsed = msElapsedSince(start); - assertTrue(throwable.getMessage().contains("while waiting for a server")); - assertTrue(elapsed < 10 + timeoutBuffer, "Took too long to time out, elapsedMS: " + elapsed); + assertTrue(elapsed <= 200, "Took too long to time out, elapsedMS: " + elapsed); } } - static Stream test8ServerSelectionArguments() { + private static Stream test8ServerSelectionArguments() { return Stream.of( Arguments.of(Named.of("serverSelectionTimeoutMS honored if timeoutMS is not set", "mongodb://invalid/?serverSelectionTimeoutMS=10")), @@ -344,6 +470,12 @@ static Stream test8ServerSelectionArguments() { ); } + private static Stream test8ServerSelectionHandshakeArguments() { + return Stream.of( + Arguments.of("timeoutMS honored for connection handshake commands if it's lower than serverSelectionTimeoutMS", 100, 200), + Arguments.of("serverSelectionTimeoutMS honored for connection handshake commands if it's lower than timeoutMS", 200, 100) + ); + } private MongoNamespace generateNamespace() { return new MongoNamespace(getDefaultDatabaseName(), diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java index 55b19dc4866..c01bdca845e 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedServerDiscoveryAndMonitoringTest.java @@ -25,8 +25,6 @@ import java.net.URISyntaxException; import java.util.Collection; -import static org.junit.Assume.assumeFalse; - public class UnifiedServerDiscoveryAndMonitoringTest extends UnifiedSyncTest { public UnifiedServerDiscoveryAndMonitoringTest(@SuppressWarnings("unused") final String fileDescription, @@ -34,12 +32,6 @@ public UnifiedServerDiscoveryAndMonitoringTest(@SuppressWarnings("unused") final final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities, final BsonArray initialData, final BsonDocument definition) { super(schemaVersion, runOnRequirements, entities, initialData, definition); - - assumeFalse("TODO (CSOT) - JAVA-5211 - apply settings to the operation context", - (fileDescription.equals("hello-timeout") && testDescription.startsWith("Network timeout on Monitor")) - || (fileDescription.equals("find-network-timeout-error") - && testDescription.startsWith("Ignore network timeout error on find")) - || fileDescription.equals("auth-network-timeout-error")); } @Parameterized.Parameters(name = "{0}: {1}")