diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index c66b5b8ead1..7ad6addbd3c 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -17,9 +17,11 @@ package com.mongodb.internal.connection; import com.mongodb.MongoClientException; +import com.mongodb.MongoException; import com.mongodb.MongoIncompatibleDriverException; import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; +import com.mongodb.UnixServerAddress; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterId; import com.mongodb.connection.ClusterSettings; @@ -33,6 +35,9 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; +import com.mongodb.internal.logging.LogMessage; +import com.mongodb.internal.logging.LogMessage.Entry; +import com.mongodb.internal.logging.StructuredLogger; import com.mongodb.internal.selector.LatencyMinimizingServerSelector; import com.mongodb.lang.Nullable; import com.mongodb.selector.CompositeServerSelector; @@ -59,6 +64,17 @@ import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents; import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener; +import static com.mongodb.internal.logging.LogMessage.Component.SERVER_SELECTION; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION_ID; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.REMAINING_TIME_MS; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SELECTOR; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_DESCRIPTION; +import static com.mongodb.internal.logging.LogMessage.Level.DEBUG; +import static com.mongodb.internal.logging.LogMessage.Level.INFO; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static java.util.Arrays.asList; @@ -67,8 +83,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; abstract class BaseCluster implements Cluster { - private static final Logger LOGGER = Loggers.getLogger("cluster"); + private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster"); private final ReentrantLock lock = new ReentrantLock(); private final AtomicReference phase = new AtomicReference<>(new CountDownLatch(1)); @@ -105,34 +121,42 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera try { CountDownLatch currentPhase = phase.get(); ClusterDescription curDescription = description; + logServerSelectionStarted(operationContext, serverSelector, curDescription); ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector); ServerTuple serverTuple = selectServer(compositeServerSelector, curDescription); - boolean selectionFailureLogged = false; + boolean selectionWaitingLogged = false; long startTimeNanos = System.nanoTime(); long curTimeNanos = startTimeNanos; - long maxWaitTimeNanos = getMaxWaitTimeNanos(); + Long maxWaitTimeNanos = getMaxWaitTimeNanos(); while (true) { - throwIfIncompatible(curDescription); + if (!curDescription.isCompatibleWithDriver()) { + throw createAndLogIncompatibleException(operationContext, serverSelector, curDescription); + } if (serverTuple != null) { + logServerSelectionSucceeded(operationContext, serverTuple.getServerDescription().getAddress(), serverSelector, curDescription); return serverTuple; } - if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) { - throw createTimeoutException(serverSelector, curDescription); + Long remainingTimeNanos = maxWaitTimeNanos == null ? null : maxWaitTimeNanos - (curTimeNanos - startTimeNanos); + + if (remainingTimeNanos != null && remainingTimeNanos <= 0) { + throw createAndLogTimeoutException(operationContext, serverSelector, curDescription); } - if (!selectionFailureLogged) { - logServerSelectionFailure(serverSelector, curDescription); - selectionFailureLogged = true; + if (!selectionWaitingLogged) { + logServerSelectionWaiting(operationContext, remainingTimeNanos, serverSelector, curDescription); + selectionWaitingLogged = true; } connect(); - currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), getMinWaitTimeNanos()), NANOSECONDS); + currentPhase.await( + remainingTimeNanos == null ? getMinWaitTimeNanos() : Math.min(remainingTimeNanos, getMinWaitTimeNanos()), + NANOSECONDS); curTimeNanos = System.nanoTime(); @@ -151,15 +175,13 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati final SingleResultCallback callback) { isTrue("open", !isClosed()); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(format("Asynchronously selecting server with selector %s", serverSelector)); - } - ServerSelectionRequest request = new ServerSelectionRequest(serverSelector, getCompositeServerSelector(serverSelector), - getMaxWaitTimeNanos(), callback); - CountDownLatch currentPhase = phase.get(); ClusterDescription currentDescription = description; + logServerSelectionStarted(operationContext, serverSelector, currentDescription); + ServerSelectionRequest request = new ServerSelectionRequest(operationContext, serverSelector, getCompositeServerSelector(serverSelector), + getMaxWaitTimeNanos(), callback); + if (!handleServerSelectionRequest(request, currentPhase, currentDescription)) { notifyWaitQueueHandler(request); } @@ -230,9 +252,10 @@ private void updatePhase() { withLock(() -> phase.getAndSet(new CountDownLatch(1)).countDown()); } - private long getMaxWaitTimeNanos() { + @Nullable + private Long getMaxWaitTimeNanos() { if (settings.getServerSelectionTimeout(NANOSECONDS) < 0) { - return Long.MAX_VALUE; + return null; } return settings.getServerSelectionTimeout(NANOSECONDS); } @@ -248,31 +271,24 @@ private boolean handleServerSelectionRequest(final ServerSelectionRequest reques CountDownLatch prevPhase = request.phase; request.phase = currentPhase; if (!description.isCompatibleWithDriver()) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Asynchronously failed server selection due to driver incompatibility with server"); - } - request.onResult(null, createIncompatibleException(description)); + request.onResult(null, createAndLogIncompatibleException(request.operationContext, request.originalSelector, description)); return true; } ServerTuple serverTuple = selectServer(request.compositeSelector, description); if (serverTuple != null) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(format("Asynchronously selected server %s", serverTuple.getServerDescription().getAddress())); - } + logServerSelectionSucceeded( + request.operationContext, serverTuple.getServerDescription().getAddress(), request.originalSelector, description); request.onResult(serverTuple, null); return true; } if (prevPhase == null) { - logServerSelectionFailure(request.originalSelector, description); + logServerSelectionWaiting(request.operationContext, request.getRemainingTime(), request.originalSelector, description); } } if (request.timedOut()) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Asynchronously failed server selection after timeout"); - } - request.onResult(null, createTimeoutException(request.originalSelector, description)); + request.onResult(null, createAndLogTimeoutException(request.operationContext, request.originalSelector, description)); return true; } @@ -283,18 +299,6 @@ private boolean handleServerSelectionRequest(final ServerSelectionRequest reques } } - private void logServerSelectionFailure(final ServerSelector serverSelector, final ClusterDescription curDescription) { - if (LOGGER.isInfoEnabled()) { - if (settings.getServerSelectionTimeout(MILLISECONDS) < 0) { - LOGGER.info(format("No server chosen by %s from cluster description %s. Waiting indefinitely.", - serverSelector, curDescription)); - } else { - LOGGER.info(format("No server chosen by %s from cluster description %s. Waiting for %d ms before timing out", - serverSelector, curDescription, settings.getServerSelectionTimeout(MILLISECONDS))); - } - } - } - @Nullable private ServerTuple selectServer(final ServerSelector serverSelector, final ClusterDescription clusterDescription) { @@ -351,10 +355,13 @@ protected ClusterableServer createServer(final ServerAddress serverAddress) { return serverFactory.create(this, serverAddress); } - private void throwIfIncompatible(final ClusterDescription curDescription) { - if (!curDescription.isCompatibleWithDriver()) { - throw createIncompatibleException(curDescription); - } + private MongoIncompatibleDriverException createAndLogIncompatibleException( + final OperationContext operationContext, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + MongoIncompatibleDriverException exception = createIncompatibleException(clusterDescription); + logServerSelectionFailed(operationContext, exception, serverSelector, clusterDescription); + return exception; } private MongoIncompatibleDriverException createIncompatibleException(final ClusterDescription curDescription) { @@ -376,24 +383,33 @@ private MongoIncompatibleDriverException createIncompatibleException(final Clust return new MongoIncompatibleDriverException(message, curDescription); } - private MongoTimeoutException createTimeoutException(final ServerSelector serverSelector, final ClusterDescription curDescription) { - return new MongoTimeoutException(format("Timed out after %d ms while waiting for a server that matches %s. " - + "Client view of cluster state is %s", - settings.getServerSelectionTimeout(MILLISECONDS), serverSelector, - curDescription.getShortDescription())); + private MongoException createAndLogTimeoutException( + final OperationContext operationContext, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + MongoTimeoutException exception = new MongoTimeoutException(format( + "Timed out while waiting for a server that matches %s. Client view of cluster state is %s", + serverSelector, clusterDescription.getShortDescription())); + logServerSelectionFailed(operationContext, exception, serverSelector, clusterDescription); + return exception; } private static final class ServerSelectionRequest { + private final OperationContext operationContext; private final ServerSelector originalSelector; private final ServerSelector compositeSelector; - private final long maxWaitTimeNanos; + @Nullable + private final Long maxWaitTimeNanos; private final SingleResultCallback callback; private final long startTimeNanos = System.nanoTime(); private CountDownLatch phase; - ServerSelectionRequest(final ServerSelector serverSelector, final ServerSelector compositeSelector, - final long maxWaitTimeNanos, + ServerSelectionRequest(final OperationContext operationContext, + final ServerSelector serverSelector, final ServerSelector compositeSelector, + @Nullable + final Long maxWaitTimeNanos, final SingleResultCallback callback) { + this.operationContext = operationContext; this.originalSelector = serverSelector; this.compositeSelector = compositeSelector; this.maxWaitTimeNanos = maxWaitTimeNanos; @@ -409,11 +425,13 @@ void onResult(@Nullable final ServerTuple serverTuple, @Nullable final Throwable } boolean timedOut() { - return System.nanoTime() - startTimeNanos > maxWaitTimeNanos; + Long remainingTimeNanos = getRemainingTime(); + return remainingTimeNanos != null && remainingTimeNanos <= 0; } - long getRemainingTime() { - return startTimeNanos + maxWaitTimeNanos - System.nanoTime(); + @Nullable + Long getRemainingTime() { + return maxWaitTimeNanos == null ? null : maxWaitTimeNanos - (System.nanoTime() - startTimeNanos); } } @@ -455,7 +473,9 @@ public void run() { if (handleServerSelectionRequest(nextRequest, currentPhase, curDescription)) { iter.remove(); } else { - waitTimeNanos = Math.min(nextRequest.getRemainingTime(), Math.min(getMinWaitTimeNanos(), waitTimeNanos)); + Long remainingTimeNanos = nextRequest.getRemainingTime(); + long minWaitTimeNanos = Math.min(getMinWaitTimeNanos(), waitTimeNanos); + waitTimeNanos = remainingTimeNanos == null ? minWaitTimeNanos : Math.min(remainingTimeNanos, minWaitTimeNanos); } } @@ -477,4 +497,84 @@ public void run() { } } } + + private void logServerSelectionStarted( + final OperationContext operationContext, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + STRUCTURED_LOGGER.log(new LogMessage( + SERVER_SELECTION, DEBUG, "Server selection started", clusterId, + asList( + new Entry(OPERATION, null), + new Entry(OPERATION_ID, operationContext.getId()), + new Entry(SELECTOR, serverSelector.toString()), + new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), + "Server selection started for operation[ {}] with ID {}. Selector: {}, topology description: {}")); + } + } + + private void logServerSelectionWaiting( + final OperationContext operationContext, + @Nullable + final Long remainingTimeNanos, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + if (STRUCTURED_LOGGER.isRequired(INFO, clusterId)) { + STRUCTURED_LOGGER.log(new LogMessage( + SERVER_SELECTION, INFO, "Waiting for suitable server to become available", clusterId, + asList( + new Entry(OPERATION, null), + new Entry(OPERATION_ID, operationContext.getId()), + new Entry(REMAINING_TIME_MS, remainingTimeNanos == null ? null : NANOSECONDS.toMillis(remainingTimeNanos)), + new Entry(SELECTOR, serverSelector.toString()), + new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), + "Waiting for server to become available for operation[ {}] with ID {}.[ Remaining time: {} ms.]" + + " Selector: {}, topology description: {}.")); + } + } + + private void logServerSelectionFailed( + final OperationContext operationContext, + final MongoException failure, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + String failureDescription = failure instanceof MongoTimeoutException + // This hardcoded message guarantees that the `FAILURE` entry for `MongoTimeoutException` does not include + // any information that is specified via other entries, e.g., `SELECTOR` and `TOPOLOGY_DESCRIPTION`. + // The logging spec requires us to avoid such duplication of information. + ? MongoTimeoutException.class.getName() + ": Timed out while waiting for a suitable server" + : failure.toString(); + STRUCTURED_LOGGER.log(new LogMessage( + SERVER_SELECTION, DEBUG, "Server selection failed", clusterId, + asList( + new Entry(OPERATION, null), + new Entry(OPERATION_ID, operationContext.getId()), + new Entry(FAILURE, failureDescription), + new Entry(SELECTOR, serverSelector.toString()), + new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), + "Server selection failed for operation[ {}] with ID {}. Failure: {}. Selector: {}, topology description: {}")); + } + } + + private void logServerSelectionSucceeded( + final OperationContext operationContext, + final ServerAddress serverAddress, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + STRUCTURED_LOGGER.log(new LogMessage( + SERVER_SELECTION, DEBUG, "Server selection succeeded", clusterId, + asList( + new Entry(OPERATION, null), + new Entry(OPERATION_ID, operationContext.getId()), + new Entry(SERVER_HOST, serverAddress.getHost()), + new Entry(SERVER_PORT, serverAddress instanceof UnixServerAddress ? null : serverAddress.getPort()), + new Entry(SELECTOR, serverSelector.toString()), + new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), + "Server selection succeeded for operation[ {}] with ID {}. Selected server: {}[:{}]." + + " Selector: {}, topology description: {}")); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index 00426a11ba2..214e58b9d59 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -17,15 +17,20 @@ package com.mongodb.internal.logging; import com.mongodb.connection.ClusterId; +import com.mongodb.internal.VisibleForTesting; import com.mongodb.lang.Nullable; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; +import static java.util.function.Function.identity; /** *

This class is not part of the public API and may be removed or changed at any time

@@ -41,11 +46,36 @@ public final class LogMessage { private final String format; public enum Component { - COMMAND, - CONNECTION + COMMAND("command"), + CONNECTION("connection"), + SERVER_SELECTION("serverSelection"); + + private static final Map INDEX; + + static { + INDEX = Stream.of(Component.values()).collect(Collectors.toMap(Component::getValue, identity())); + } + + private final String value; + + Component(final String value) { + this.value = value; + } + + @VisibleForTesting(otherwise = PRIVATE) + public String getValue() { + return value; + } + + @VisibleForTesting(otherwise = PRIVATE) + public static Component of(final String value) { + Component result = INDEX.get(value); + return assertNotNull(result); + } } public enum Level { + INFO, DEBUG } @@ -73,6 +103,10 @@ public enum Name { COMMAND_NAME("commandName"), REQUEST_ID("requestId"), OPERATION_ID("operationId"), + /** + * Not supported. + */ + OPERATION("operation"), SERVICE_ID("serviceId"), SERVER_CONNECTION_ID("serverConnectionId"), DRIVER_CONNECTION_ID("driverConnectionId"), @@ -82,11 +116,15 @@ public enum Name { COMMAND_CONTENT("command"), REASON_DESCRIPTION("reason"), ERROR_DESCRIPTION("error"), + FAILURE("failure"), MAX_IDLE_TIME_MS("maxIdleTimeMS"), MIN_POOL_SIZE("minPoolSize"), MAX_POOL_SIZE("maxPoolSize"), MAX_CONNECTING("maxConnecting"), - WAIT_QUEUE_TIMEOUT_MS("waitQueueTimeoutMS"); + WAIT_QUEUE_TIMEOUT_MS("waitQueueTimeoutMS"), + SELECTOR("selector"), + TOPOLOGY_DESCRIPTION("topologyDescription"), + REMAINING_TIME_MS("remainingTimeMS"); private final String value; diff --git a/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java b/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java index d6bd8deb5ba..d65a80ef230 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java +++ b/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java @@ -24,6 +24,9 @@ import com.mongodb.lang.Nullable; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; @@ -68,10 +71,11 @@ public boolean isRequired(final Level level, final ClusterId clusterId) { return true; } - //noinspection SwitchStatementWithTooFewBranches switch (level) { case DEBUG: return logger.isDebugEnabled(); + case INFO: + return logger.isInfoEnabled(); default: throw new UnsupportedOperationException(); } @@ -82,22 +86,32 @@ public void log(final LogMessage logMessage) { if (interceptor != null) { interceptor.intercept(logMessage); } - //noinspection SwitchStatementWithTooFewBranches switch (logMessage.getLevel()) { case DEBUG: - if (logger.isDebugEnabled()) { - LogMessage.UnstructuredLogMessage unstructuredLogMessage = logMessage.toUnstructuredLogMessage(); - String message = unstructuredLogMessage.interpolate(); - Throwable exception = logMessage.getException(); - if (exception == null) { - logger.debug(message); - } else { - logger.debug(message, exception); - } - } + logUnstructured(logMessage, logger::isDebugEnabled, logger::debug, logger::debug); + break; + case INFO: + logUnstructured(logMessage, logger::isInfoEnabled, logger::info, logger::info); break; default: throw new UnsupportedOperationException(); } } + + private static void logUnstructured( + final LogMessage logMessage, + final Supplier loggingEnabled, + final Consumer doLog, + final BiConsumer doLogWithException) { + if (loggingEnabled.get()) { + LogMessage.UnstructuredLogMessage unstructuredLogMessage = logMessage.toUnstructuredLogMessage(); + String message = unstructuredLogMessage.interpolate(); + Throwable exception = logMessage.getException(); + if (exception == null) { + doLog.accept(message); + } else { + doLogWithException.accept(message, exception); + } + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java b/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java index 64114658cc3..35268e68f13 100644 --- a/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java +++ b/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java @@ -19,6 +19,7 @@ import com.mongodb.MongoException; import com.mongodb.ReadPreference; import com.mongodb.ServerApi; +import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ServerDescription; import com.mongodb.internal.IgnorableRequestContext; import com.mongodb.internal.binding.StaticBindingContext; @@ -29,6 +30,7 @@ import com.mongodb.internal.selector.ReadPreferenceServerSelector; import com.mongodb.internal.validator.NoOpFieldNameValidator; import com.mongodb.lang.Nullable; +import com.mongodb.selector.ServerSelector; import com.mongodb.session.ServerSession; import org.bson.BsonArray; import org.bson.BsonBinary; @@ -114,9 +116,13 @@ private void endClosedSessions() { return; } - List primaryPreferred = new ReadPreferenceServerSelector(ReadPreference.primaryPreferred()) + ReadPreference primaryPreferred = ReadPreference.primaryPreferred(); + List primaryPreferredServers = new ReadPreferenceServerSelector(primaryPreferred) .select(cluster.getCurrentDescription()); - if (primaryPreferred.isEmpty()) { + if (primaryPreferredServers.isEmpty()) { + // Skip doing server selection if we anticipate that no server is readily selectable. + // This approach is racy, and it is still possible to become blocked selecting a server + // even if `primaryPreferredServers` is not empty. return; } @@ -124,14 +130,26 @@ private void endClosedSessions() { try { StaticBindingContext context = new StaticBindingContext(NoOpSessionContext.INSTANCE, serverApi, IgnorableRequestContext.INSTANCE, new OperationContext()); - connection = cluster.selectServer(clusterDescription -> { - for (ServerDescription cur : clusterDescription.getServerDescriptions()) { - if (cur.getAddress().equals(primaryPreferred.get(0).getAddress())) { - return Collections.singletonList(cur); - } - } - return Collections.emptyList(); - }, context.getOperationContext()).getServer().getConnection(context.getOperationContext()); + connection = cluster.selectServer( + new ServerSelector() { + @Override + public List select(final ClusterDescription clusterDescription) { + for (ServerDescription cur : clusterDescription.getServerDescriptions()) { + if (cur.getAddress().equals(primaryPreferredServers.get(0).getAddress())) { + return Collections.singletonList(cur); + } + } + return Collections.emptyList(); + } + + @Override + public String toString() { + return "ReadPreferenceServerSelector{" + + "readPreference=" + primaryPreferred + + '}'; + } + }, + context.getOperationContext()).getServer().getConnection(context.getOperationContext()); connection.command("admin", new BsonDocument("endSessions", new BsonArray(identifiers)), new NoOpFieldNameValidator(), diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/load-balanced.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/load-balanced.json new file mode 100644 index 00000000000..5855c4e991e --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/load-balanced.json @@ -0,0 +1,107 @@ +{ + "description": "server-selection-logging", + "schemaVersion": "1.13", + "runOnRequirements": [ + { + "topologies": [ + "load-balanced" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "heartbeatFrequencyMS": 500 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + } + ], + "tests": [ + { + "description": "A successful operation - load balanced cluster", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "LoadBalancer" + } + } + }, + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/operation-id.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/operation-id.json new file mode 100644 index 00000000000..276e4b8d6d9 --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/operation-id.json @@ -0,0 +1,229 @@ +{ + "description": "operation-id", + "schemaVersion": "1.14", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "appName": "loggingClient", + "serverSelectionTimeoutMS": 2000 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent", + "topologyDescriptionChangedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + }, + { + "client": { + "id": "failPointClient" + } + } + ], + "tests": [ + { + "description": "Successful bulkWrite operation: log messages have operationIds", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "bulkWrite", + "object": "collection", + "arguments": { + "requests": [ + { + "insertOne": { + "document": { + "x": 1 + } + } + } + ] + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + } + ] + } + ] + }, + { + "description": "Failed bulkWrite operation: log messages have operationIds", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "ismaster" + ], + "appName": "loggingClient", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "Unknown" + } + } + }, + "count": 1 + } + }, + { + "name": "bulkWrite", + "object": "collection", + "arguments": { + "requests": [ + { + "insertOne": { + "document": { + "x": 1 + } + } + } + ] + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/replica-set.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/replica-set.json new file mode 100644 index 00000000000..5eba784bf2a --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/replica-set.json @@ -0,0 +1,228 @@ +{ + "description": "replica-set-logging", + "schemaVersion": "1.14", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "serverSelectionTimeoutMS": 2000 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent", + "topologyDescriptionChangedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + }, + { + "client": { + "id": "failPointClient" + } + }, + { + "collection": { + "id": "unsatisfiableRPColl", + "database": "database", + "collectionName": "unsatisfiableRPColl", + "collectionOptions": { + "readPreference": { + "mode": "Secondary", + "tagSets": [ + { + "nonexistenttag": "a" + } + ] + } + } + } + } + ], + "tests": [ + { + "description": "A successful operation", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 4 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "Server selection fails due to unsatisfiable read preference", + "runOnRequirements": [ + { + "minServerVersion": "4.0" + } + ], + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 4 + } + }, + { + "name": "find", + "object": "unsatisfiableRPColl", + "arguments": { + "filter": { + "x": 1 + } + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "find", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "selector": { + "$$exists": true + }, + "operation": "find", + "topologyDescription": { + "$$exists": true + }, + "remainingTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "selector": { + "$$exists": true + }, + "operation": "find", + "topologyDescription": { + "$$exists": true + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/sharded.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/sharded.json new file mode 100644 index 00000000000..d42fba91004 --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/sharded.json @@ -0,0 +1,237 @@ +{ + "description": "server-selection-logging", + "schemaVersion": "1.14", + "runOnRequirements": [ + { + "topologies": [ + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "appName": "loggingClient", + "serverSelectionTimeoutMS": 2000 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent", + "topologyDescriptionChangedEvent" + ], + "useMultipleMongoses": false + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + }, + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + } + ], + "tests": [ + { + "description": "A successful operation", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "Failure due to unreachable server", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "ismaster" + ], + "appName": "loggingClient", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "Unknown" + } + } + }, + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "remainingTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/standalone.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/standalone.json new file mode 100644 index 00000000000..3b3eddd841e --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/standalone.json @@ -0,0 +1,235 @@ +{ + "description": "standalone-logging", + "schemaVersion": "1.14", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "appName": "loggingClient", + "serverSelectionTimeoutMS": 2000 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent", + "topologyDescriptionChangedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + }, + { + "client": { + "id": "failPointClient" + } + } + ], + "tests": [ + { + "description": "A successful operation", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "Failure due to unreachable server", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "ismaster" + ], + "appName": "loggingClient", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "Unknown" + } + } + }, + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "remainingTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy index 39c52b23821..c7428d2f4e7 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy @@ -191,7 +191,7 @@ class BaseClusterSpecification extends Specification { then: def e = thrown(MongoTimeoutException) - e.getMessage().startsWith("Timed out after ${serverSelectionTimeoutMS} ms while waiting for a server " + + e.getMessage().startsWith("Timed out while waiting for a server " + 'that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN') e.getMessage().contains('{address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' + 'exception={com.mongodb.MongoInternalException: oops}}') diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java index 93cdbac7ad0..89ca0088a77 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java @@ -20,17 +20,28 @@ import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.ClusterListener; import com.mongodb.event.ClusterOpeningEvent; +import com.mongodb.lang.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.Locks.withLock; -class TestClusterListener implements ClusterListener { - private ClusterOpeningEvent clusterOpeningEvent; - private ClusterClosedEvent clusterClosingEvent; - private final List clusterDescriptionChangedEvents = new ArrayList<>(); +public final class TestClusterListener implements ClusterListener { + @Nullable + private volatile ClusterOpeningEvent clusterOpeningEvent; + @Nullable + private volatile ClusterClosedEvent clusterClosingEvent; + private final ArrayList clusterDescriptionChangedEvents = new ArrayList<>(); + private final ReentrantLock lock = new ReentrantLock(); + private final Condition newClusterDescriptionChangedEventCondition = lock.newCondition(); @Override public void clusterOpening(final ClusterOpeningEvent event) { @@ -47,22 +58,62 @@ public void clusterClosed(final ClusterClosedEvent event) { @Override public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) { notNull("event", event); - clusterDescriptionChangedEvents.add(event); + withLock(lock, () -> { + clusterDescriptionChangedEvents.add(event); + newClusterDescriptionChangedEventCondition.signalAll(); + }); } + @Nullable public ClusterOpeningEvent getClusterOpeningEvent() { return clusterOpeningEvent; } + @Nullable public ClusterClosedEvent getClusterClosingEvent() { return clusterClosingEvent; } public List getClusterDescriptionChangedEvents() { - return clusterDescriptionChangedEvents; + return withLock(lock, () -> new ArrayList<>(clusterDescriptionChangedEvents)); } + /** + * Calling this method concurrently with {@link #waitForClusterDescriptionChangedEvents(Predicate, int, Duration)}, + * may result in {@link #waitForClusterDescriptionChangedEvents(Predicate, int, Duration)} not working as expected. + */ public void clearClusterDescriptionChangedEvents() { - clusterDescriptionChangedEvents.clear(); + withLock(lock, clusterDescriptionChangedEvents::clear); + } + + /** + * Calling this method concurrently with {@link #clearClusterDescriptionChangedEvents()}, + * may result in {@link #waitForClusterDescriptionChangedEvents(Predicate, int, Duration)} not working as expected. + */ + public void waitForClusterDescriptionChangedEvents( + final Predicate matcher, final int count, final Duration duration) + throws InterruptedException, TimeoutException { + long nanosRemaining = duration.toNanos(); + lock.lock(); + try { + long observedCount = unguardedCount(matcher); + while (observedCount < count) { + if (nanosRemaining <= 0) { + throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d.", + count, ClusterDescriptionChangedEvent.class.getSimpleName(), observedCount)); + } + nanosRemaining = newClusterDescriptionChangedEventCondition.awaitNanos(nanosRemaining); + observedCount = unguardedCount(matcher); + } + } finally { + lock.unlock(); + } + } + + /** + * Must be guarded by {@link #lock}. + */ + private long unguardedCount(final Predicate matcher) { + return clusterDescriptionChangedEvents.stream().filter(matcher).count(); } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java index b565aeac969..007074f8cc6 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java @@ -16,14 +16,16 @@ package com.mongodb.internal.connection; +import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.ServerClosedEvent; import com.mongodb.event.ServerDescriptionChangedEvent; import com.mongodb.event.ServerListener; import com.mongodb.event.ServerOpeningEvent; +import com.mongodb.lang.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -31,15 +33,16 @@ import java.util.function.Predicate; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.Locks.withLock; public class TestServerListener implements ServerListener { - private ServerOpeningEvent serverOpeningEvent; - private ServerClosedEvent serverClosedEvent; + @Nullable + private volatile ServerOpeningEvent serverOpeningEvent; + @Nullable + private volatile ServerClosedEvent serverClosedEvent; private final List serverDescriptionChangedEvents = new ArrayList<>(); private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); - private volatile int waitingForEventCount; - private Predicate waitingForEventMatcher; @Override public void serverOpening(final ServerOpeningEvent event) { @@ -54,61 +57,53 @@ public void serverClosed(final ServerClosedEvent event) { @Override public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) { notNull("event", event); - lock.lock(); - try { + withLock(lock, () -> { serverDescriptionChangedEvents.add(event); - if (waitingForEventCount != 0 && containsEvents()) { - condition.signalAll(); - } - - } finally { - lock.unlock(); - } + condition.signalAll(); + }); } + @Nullable public ServerOpeningEvent getServerOpeningEvent() { return serverOpeningEvent; } + @Nullable public ServerClosedEvent getServerClosedEvent() { return serverClosedEvent; } public List getServerDescriptionChangedEvents() { - return serverDescriptionChangedEvents; + return withLock(lock, () -> new ArrayList<>(serverDescriptionChangedEvents)); } - public void waitForServerDescriptionChangedEvent(final Predicate matcher, final int count, - final int time, final TimeUnit unit) throws InterruptedException, TimeoutException { + public void waitForServerDescriptionChangedEvents( + final Predicate matcher, final int count, final Duration duration) + throws InterruptedException, TimeoutException { if (count <= 0) { throw new IllegalArgumentException(); } + long nanosRemaining = duration.toNanos(); lock.lock(); try { - if (waitingForEventCount != 0) { - throw new IllegalStateException("Already waiting for events"); - } - waitingForEventCount = count; - waitingForEventMatcher = matcher; - if (containsEvents()) { - return; - } - if (!condition.await(time, unit)) { - throw new TimeoutException("Timed out waiting for " + count + " ServerDescriptionChangedEvent events. " - + "The count after timing out is " + countEvents()); + long observedCount = unguardedCount(matcher); + while (observedCount < count) { + if (nanosRemaining <= 0) { + throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d.", + count, ClusterDescriptionChangedEvent.class.getSimpleName(), observedCount)); + } + nanosRemaining = condition.awaitNanos(nanosRemaining); + observedCount = unguardedCount(matcher); } } finally { - waitingForEventCount = 0; - waitingForEventMatcher = null; lock.unlock(); } } - private long countEvents() { - return serverDescriptionChangedEvents.stream().filter(waitingForEventMatcher).count(); - } - - private boolean containsEvents() { - return countEvents() >= waitingForEventCount; + /** + * Must be guarded by {@link #lock}. + */ + private long unguardedCount(final Predicate matcher) { + return serverDescriptionChangedEvents.stream().filter(matcher).count(); } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ServerSelectionLoggingTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ServerSelectionLoggingTest.java new file mode 100644 index 00000000000..433329def96 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ServerSelectionLoggingTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client.unified; + +import com.mongodb.lang.Nullable; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collection; + +public final class ServerSelectionLoggingTest extends UnifiedReactiveStreamsTest { + public ServerSelectionLoggingTest(@SuppressWarnings("unused") final String fileDescription, + @SuppressWarnings("unused") final String testDescription, + final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities, + final BsonArray initialData, final BsonDocument definition) { + super(schemaVersion, runOnRequirements, entities, initialData, definition); + } + + @Parameterized.Parameters(name = "{0}: {1}") + public static Collection data() throws URISyntaxException, IOException { + return getTestData("unified-test-format/server-selection/logging"); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java index 91b87ab482b..110cf321f4e 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java @@ -115,10 +115,18 @@ public static ContextElement ofWaitForServerDescriptionChangedEvents(final Strin return new EventCountContext("Wait For Server Description Changed Events", client, event, count); } - public static ContextElement ofServerDescriptionChangeEventCount(final String client, final BsonDocument event, final int count) { + public static ContextElement ofServerDescriptionChangedEventCount(final String client, final BsonDocument event, final int count) { return new EventCountContext("Server Description Changed Event Count", client, event, count); } + public static ContextElement ofWaitForClusterDescriptionChangedEvents(final String client, final BsonDocument event, final int count) { + return new EventCountContext("Wait For Cluster Description Changed Events", client, event, count); + } + + public static ContextElement ofClusterDescriptionChangedEventCount(final String client, final BsonDocument event, final int count) { + return new EventCountContext("Cluster Description Changed Event Count", client, event, count); + } + private static class EventCountContext extends ContextElement { private final String name; diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java index 3bc32e26c26..ab72c27179a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java @@ -23,6 +23,7 @@ import com.mongodb.ReadConcernLevel; import com.mongodb.ServerApi; import com.mongodb.ServerApiVersion; +import com.mongodb.internal.connection.TestClusterListener; import com.mongodb.logging.TestLoggingInterceptor; import com.mongodb.TransactionOptions; import com.mongodb.WriteConcern; @@ -118,6 +119,7 @@ public final class Entities { private final Map clientLoggingInterceptors = new HashMap<>(); private final Map clientConnectionPoolListeners = new HashMap<>(); private final Map clientServerListeners = new HashMap<>(); + private final Map clientClusterListeners = new HashMap<>(); private final Map> cursors = new HashMap<>(); private final Map topologyDescriptions = new HashMap<>(); private final Map successCounts = new HashMap<>(); @@ -278,6 +280,10 @@ public TestServerListener getServerListener(final String id) { return getEntity(id + "-server-listener", clientServerListeners, "server listener"); } + public TestClusterListener getClusterListener(final String id) { + return getEntity(id + "-cluster-listener", clientClusterListeners, "cluster listener"); + } + private T getEntity(final String id, final Map entities, final String type) { T entity = entities.get(id); if (entity == null) { @@ -361,9 +367,13 @@ private void initClient(final BsonDocument entity, final String id, clientSettingsBuilder.applicationName(id); clientSettingsBuilder.applyToLoggerSettings(builder -> builder.maxDocumentLength(10_000)); - TestServerListener testClusterListener = new TestServerListener(); - clientSettingsBuilder.applyToServerSettings(builder -> builder.addServerListener(testClusterListener)); - putEntity(id + "-server-listener", testClusterListener, clientServerListeners); + TestServerListener testServerListener = new TestServerListener(); + clientSettingsBuilder.applyToServerSettings(builder -> builder.addServerListener(testServerListener)); + putEntity(id + "-server-listener", testServerListener, clientServerListeners); + + TestClusterListener testClusterListener = new TestClusterListener(); + clientSettingsBuilder.applyToClusterSettings(builder -> builder.addClusterListener(testClusterListener)); + putEntity(id + "-cluster-listener", testClusterListener, clientClusterListeners); if (entity.containsKey("observeEvents")) { List ignoreCommandMonitoringEvents = entity @@ -485,6 +495,7 @@ private void initClient(final BsonDocument entity, final String id, } break; case "appname": + case "appName": clientSettingsBuilder.applicationName(value.asString().getValue()); break; default: @@ -526,8 +537,7 @@ private void initClient(final BsonDocument entity, final String id, private static LogMessage.Component toComponent(final Map.Entry entry) { String componentName = entry.getKey(); - return LogMessage.Component - .valueOf(componentName.toUpperCase()); + return LogMessage.Component.of(componentName); } private static LogMessage.Level toLevel(final Map.Entry entry) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index 1cc97e52b5c..777791aafaf 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -17,6 +17,7 @@ package com.mongodb.client.unified; import com.mongodb.connection.ServerType; +import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandStartedEvent; @@ -28,6 +29,7 @@ import com.mongodb.event.ConnectionPoolReadyEvent; import com.mongodb.event.ConnectionReadyEvent; import com.mongodb.event.ServerDescriptionChangedEvent; +import com.mongodb.internal.connection.TestClusterListener; import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.internal.connection.TestServerListener; import com.mongodb.lang.NonNull; @@ -35,10 +37,14 @@ import org.bson.BsonDocument; import org.bson.types.ObjectId; +import java.time.Duration; +import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -235,8 +241,8 @@ public void waitForServerDescriptionChangedEvents(final String client, final Bso context.push(ContextElement.ofWaitForServerDescriptionChangedEvents(client, expectedEvent, count)); BsonDocument expectedEventContents = getEventContents(expectedEvent); try { - serverListener.waitForServerDescriptionChangedEvent( - event -> serverDescriptionChangedEventMatches(expectedEventContents, event), count, 10, TimeUnit.SECONDS); + serverListener.waitForServerDescriptionChangedEvents( + event -> serverDescriptionChangedEventMatches(expectedEventContents, event), count, Duration.ofSeconds(10)); context.pop(); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -248,22 +254,52 @@ public void waitForServerDescriptionChangedEvents(final String client, final Bso public void assertServerDescriptionChangeEventCount(final String client, final BsonDocument expectedEvent, final int count, final List events) { BsonDocument expectedEventContents = getEventContents(expectedEvent); - context.push(ContextElement.ofServerDescriptionChangeEventCount(client, expectedEvent, count)); + context.push(ContextElement.ofServerDescriptionChangedEventCount(client, expectedEvent, count)); long matchCount = events.stream().filter(event -> serverDescriptionChangedEventMatches(expectedEventContents, event)).count(); assertEquals(context.getMessage("Expected server description changed event counts to match"), count, matchCount); context.pop(); } + public void waitForClusterDescriptionChangedEvents(final String client, final BsonDocument expectedEvent, final int count, + final TestClusterListener clusterListener) { + context.push(ContextElement.ofWaitForClusterDescriptionChangedEvents(client, expectedEvent, count)); + BsonDocument expectedEventContents = getEventContents(expectedEvent); + try { + clusterListener.waitForClusterDescriptionChangedEvents( + event -> clusterDescriptionChangedEventMatches(expectedEventContents, event), count, Duration.ofSeconds(10)); + context.pop(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + fail(context.getMessage("Timed out waiting for cluster description changed events")); + } + } + + public void assertClusterDescriptionChangeEventCount(final String client, final BsonDocument expectedEvent, final int count, + final List events) { + BsonDocument expectedEventContents = getEventContents(expectedEvent); + context.push(ContextElement.ofClusterDescriptionChangedEventCount(client, expectedEvent, count)); + long matchCount = events.stream().filter(event -> clusterDescriptionChangedEventMatches(expectedEventContents, event)).count(); + assertEquals(context.getMessage("Expected cluster description changed event counts to match"), count, matchCount); + context.pop(); + } + @NonNull private BsonDocument getEventContents(final BsonDocument expectedEvent) { - if (!expectedEvent.getFirstKey().equals("serverDescriptionChangedEvent")) { - throw new UnsupportedOperationException("Unsupported event type " + expectedEvent.getFirstKey()); + HashSet supportedEventTypes = new HashSet<>(asList("serverDescriptionChangedEvent", "topologyDescriptionChangedEvent")); + String expectedEventType = expectedEvent.getFirstKey(); + if (!supportedEventTypes.contains(expectedEventType)) { + throw new UnsupportedOperationException("Unsupported event type " + expectedEventType); } @SuppressWarnings("OptionalGetWithoutIsPresent") BsonDocument expectedEventContents = expectedEvent.values().stream().findFirst().get().asDocument(); if (expectedEventContents.isEmpty()) { return expectedEventContents; } + HashSet emptyEventTypes = new HashSet<>(singleton("topologyDescriptionChangedEvent")); + if (emptyEventTypes.contains(expectedEventType)) { + throw new UnsupportedOperationException("Contents of " + expectedEventType + " must be empty"); + } if (expectedEventContents.size() != 1 || !expectedEventContents.getFirstKey().equals("newDescription") || expectedEventContents.getDocument("newDescription").size() != 1) { throw new UnsupportedOperationException("Unsupported event contents " + expectedEvent); @@ -286,6 +322,15 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e } } + private static boolean clusterDescriptionChangedEventMatches(final BsonDocument expectedEventContents, + final ClusterDescriptionChangedEvent event) { + if (!expectedEventContents.isEmpty()) { + throw new UnsupportedOperationException( + "Contents of " + ClusterDescriptionChangedEvent.class.getSimpleName() + " must be empty"); + } + return true; + } + private static String getEventType(final Class eventClass) { String eventClassName = eventClass.getSimpleName(); if (eventClassName.startsWith("ConnectionPool")) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java index b18f7151e51..de8bfaea452 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java @@ -50,6 +50,11 @@ void assertLogMessageEquality(final String client, final BsonArray expectedMessa for (int i = 0; i < expectedMessages.size(); i++) { BsonDocument expectedMessageAsDocument = expectedMessages.get(i).asDocument().clone(); + // `LogMessage.Entry.Name.OPERATION` is not supported, therefore we skip matching its value + BsonValue expectedDataDocument = expectedMessageAsDocument.get("data"); + if (expectedDataDocument != null) { + expectedDataDocument.asDocument().remove(LogMessage.Entry.Name.OPERATION.getValue()); + } valueMatcher.assertValuesMatch(expectedMessageAsDocument, asDocument(actualMessages.get(i))); } @@ -58,7 +63,7 @@ void assertLogMessageEquality(final String client, final BsonArray expectedMessa static BsonDocument asDocument(final LogMessage message) { BsonDocument document = new BsonDocument(); - document.put("component", new BsonString(message.getComponent().name().toLowerCase())); + document.put("component", new BsonString(message.getComponent().getValue())); document.put("level", new BsonString(message.getLevel().name().toLowerCase())); document.put("hasFailure", BsonBoolean.valueOf(message.getException() != null)); document.put("failureIsRedacted", diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ServerSelectionLoggingTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ServerSelectionLoggingTest.java new file mode 100644 index 00000000000..4ce99fb76a8 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ServerSelectionLoggingTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.unified; + +import com.mongodb.lang.Nullable; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collection; + +public final class ServerSelectionLoggingTest extends UnifiedSyncTest { + public ServerSelectionLoggingTest(@SuppressWarnings("unused") final String fileDescription, + @SuppressWarnings("unused") final String testDescription, + final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities, + final BsonArray initialData, final BsonDocument definition) { + super(schemaVersion, runOnRequirements, entities, initialData, definition); + } + + @Parameterized.Parameters(name = "{0}: {1}") + public static Collection data() throws URISyntaxException, IOException { + return getTestData("unified-test-format/server-selection/logging"); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java index 97f24422a11..5ffc4862c7a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java @@ -22,6 +22,8 @@ import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; +import com.mongodb.Tag; +import com.mongodb.TagSet; import com.mongodb.TransactionOptions; import com.mongodb.WriteConcern; import com.mongodb.bulk.BulkWriteResult; @@ -90,7 +92,6 @@ import org.bson.codecs.configuration.CodecRegistries; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -136,14 +137,32 @@ static WriteConcern asWriteConcern(final BsonDocument writeConcernDocument) { } public static ReadPreference asReadPreference(final BsonDocument readPreferenceDocument) { + List supportedKeys = asList("mode", "tagSets", "maxStalenessSeconds"); + List unsupportedKeys = readPreferenceDocument.keySet().stream().filter(key -> !supportedKeys.contains(key)).collect(toList()); + if (!unsupportedKeys.isEmpty()) { + throw new UnsupportedOperationException("Unsupported read preference keys: " + unsupportedKeys + " in " + readPreferenceDocument); + } + String mode = readPreferenceDocument.getString("mode").getValue(); if (readPreferenceDocument.size() == 1) { - return ReadPreference.valueOf(readPreferenceDocument.getString("mode").getValue()); - } else if (readPreferenceDocument.size() == 2) { - return ReadPreference.valueOf(readPreferenceDocument.getString("mode").getValue(), - Collections.emptyList(), readPreferenceDocument.getNumber("maxStalenessSeconds").intValue(), TimeUnit.SECONDS); - } else { - throw new UnsupportedOperationException("Unsupported read preference properties: " + readPreferenceDocument.toJson()); + return ReadPreference.valueOf(mode); + } + List tagSets = tagSets(readPreferenceDocument.getArray("tagSets", new BsonArray())); + BsonValue maxStalenessSecondsBson = readPreferenceDocument.get("maxStalenessSeconds"); + Integer maxStalenessSeconds = maxStalenessSecondsBson == null ? null : maxStalenessSecondsBson.asInt32().intValue(); + if (maxStalenessSecondsBson == null) { + return ReadPreference.valueOf(mode, tagSets); } + return ReadPreference.valueOf(mode, tagSets, maxStalenessSeconds, TimeUnit.SECONDS); + } + + private static List tagSets(final BsonArray tagSetsBson) { + return tagSetsBson.stream() + .map(tagSetBson -> new TagSet(tagSetBson.asDocument() + .entrySet() + .stream() + .map(entry -> new Tag(entry.getKey(), entry.getValue().asString().getValue())) + .collect(toList()))) + .collect(toList()); } private OperationResult resultOf(final Supplier operationResult) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 0ae4b644975..49c94f486cd 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -578,6 +578,10 @@ private OperationResult executeWaitForEvent(final UnifiedTestContext context, fi context.getEventMatcher().waitForServerDescriptionChangedEvents(clientId, event, count, entities.getServerListener(clientId)); break; + case "topologyDescriptionChangedEvent": + context.getEventMatcher().waitForClusterDescriptionChangedEvents(clientId, event, count, + entities.getClusterListener(clientId)); + break; case "poolClearedEvent": case "poolReadyEvent": case "connectionCreatedEvent": @@ -603,6 +607,10 @@ private OperationResult executeAssertEventCount(final UnifiedTestContext context context.getEventMatcher().assertServerDescriptionChangeEventCount(clientId, event, count, entities.getServerListener(clientId).getServerDescriptionChangedEvents()); break; + case "topologyDescriptionChangedEvent": + context.getEventMatcher().assertClusterDescriptionChangeEventCount(clientId, event, count, + entities.getClusterListener(clientId).getClusterDescriptionChangedEvents()); + break; case "poolClearedEvent": case "poolReadyEvent": context.getEventMatcher().assertConnectionPoolEventCount(clientId, event, count,