From 14b3b1a88f23b282980e4293920c4a24c478446e Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Thu, 28 Sep 2023 14:26:40 -0600 Subject: [PATCH 01/10] Add unified tests JAVA-4754 --- .../logging/load-balanced.json | 107 ++++++++ .../logging/operation-id.json | 229 +++++++++++++++++ .../server-selection/logging/replica-set.json | 228 +++++++++++++++++ .../server-selection/logging/sharded.json | 237 ++++++++++++++++++ .../server-selection/logging/standalone.json | 235 +++++++++++++++++ 5 files changed, 1036 insertions(+) create mode 100644 driver-core/src/test/resources/unified-test-format/server-selection/logging/load-balanced.json create mode 100644 driver-core/src/test/resources/unified-test-format/server-selection/logging/operation-id.json create mode 100644 driver-core/src/test/resources/unified-test-format/server-selection/logging/replica-set.json create mode 100644 driver-core/src/test/resources/unified-test-format/server-selection/logging/sharded.json create mode 100644 driver-core/src/test/resources/unified-test-format/server-selection/logging/standalone.json diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/load-balanced.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/load-balanced.json new file mode 100644 index 00000000000..5855c4e991e --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/load-balanced.json @@ -0,0 +1,107 @@ +{ + "description": "server-selection-logging", + "schemaVersion": "1.13", + "runOnRequirements": [ + { + "topologies": [ + "load-balanced" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "heartbeatFrequencyMS": 500 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + } + ], + "tests": [ + { + "description": "A successful operation - load balanced cluster", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "LoadBalancer" + } + } + }, + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/operation-id.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/operation-id.json new file mode 100644 index 00000000000..276e4b8d6d9 --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/operation-id.json @@ -0,0 +1,229 @@ +{ + "description": "operation-id", + "schemaVersion": "1.14", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "appName": "loggingClient", + "serverSelectionTimeoutMS": 2000 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent", + "topologyDescriptionChangedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + }, + { + "client": { + "id": "failPointClient" + } + } + ], + "tests": [ + { + "description": "Successful bulkWrite operation: log messages have operationIds", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "bulkWrite", + "object": "collection", + "arguments": { + "requests": [ + { + "insertOne": { + "document": { + "x": 1 + } + } + } + ] + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + } + ] + } + ] + }, + { + "description": "Failed bulkWrite operation: log messages have operationIds", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "ismaster" + ], + "appName": "loggingClient", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "Unknown" + } + } + }, + "count": 1 + } + }, + { + "name": "bulkWrite", + "object": "collection", + "arguments": { + "requests": [ + { + "insertOne": { + "document": { + "x": 1 + } + } + } + ] + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "operationId": { + "$$type": [ + "int", + "long" + ] + }, + "operation": "insert" + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/replica-set.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/replica-set.json new file mode 100644 index 00000000000..5eba784bf2a --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/replica-set.json @@ -0,0 +1,228 @@ +{ + "description": "replica-set-logging", + "schemaVersion": "1.14", + "runOnRequirements": [ + { + "topologies": [ + "replicaset" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "serverSelectionTimeoutMS": 2000 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent", + "topologyDescriptionChangedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + }, + { + "client": { + "id": "failPointClient" + } + }, + { + "collection": { + "id": "unsatisfiableRPColl", + "database": "database", + "collectionName": "unsatisfiableRPColl", + "collectionOptions": { + "readPreference": { + "mode": "Secondary", + "tagSets": [ + { + "nonexistenttag": "a" + } + ] + } + } + } + } + ], + "tests": [ + { + "description": "A successful operation", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 4 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "Server selection fails due to unsatisfiable read preference", + "runOnRequirements": [ + { + "minServerVersion": "4.0" + } + ], + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 4 + } + }, + { + "name": "find", + "object": "unsatisfiableRPColl", + "arguments": { + "filter": { + "x": 1 + } + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "find", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "selector": { + "$$exists": true + }, + "operation": "find", + "topologyDescription": { + "$$exists": true + }, + "remainingTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "selector": { + "$$exists": true + }, + "operation": "find", + "topologyDescription": { + "$$exists": true + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/sharded.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/sharded.json new file mode 100644 index 00000000000..d42fba91004 --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/sharded.json @@ -0,0 +1,237 @@ +{ + "description": "server-selection-logging", + "schemaVersion": "1.14", + "runOnRequirements": [ + { + "topologies": [ + "sharded" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "appName": "loggingClient", + "serverSelectionTimeoutMS": 2000 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent", + "topologyDescriptionChangedEvent" + ], + "useMultipleMongoses": false + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + }, + { + "client": { + "id": "failPointClient", + "useMultipleMongoses": false + } + } + ], + "tests": [ + { + "description": "A successful operation", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "Failure due to unreachable server", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "ismaster" + ], + "appName": "loggingClient", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "Unknown" + } + } + }, + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "remainingTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} diff --git a/driver-core/src/test/resources/unified-test-format/server-selection/logging/standalone.json b/driver-core/src/test/resources/unified-test-format/server-selection/logging/standalone.json new file mode 100644 index 00000000000..3b3eddd841e --- /dev/null +++ b/driver-core/src/test/resources/unified-test-format/server-selection/logging/standalone.json @@ -0,0 +1,235 @@ +{ + "description": "standalone-logging", + "schemaVersion": "1.14", + "runOnRequirements": [ + { + "topologies": [ + "single" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client", + "uriOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "appName": "loggingClient", + "serverSelectionTimeoutMS": 2000 + }, + "observeLogMessages": { + "serverSelection": "debug" + }, + "observeEvents": [ + "serverDescriptionChangedEvent", + "topologyDescriptionChangedEvent" + ] + } + }, + { + "database": { + "id": "database", + "client": "client", + "databaseName": "logging-tests" + } + }, + { + "collection": { + "id": "collection", + "database": "database", + "collectionName": "server-selection" + } + }, + { + "client": { + "id": "failPointClient" + } + } + ], + "tests": [ + { + "description": "A successful operation", + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "topologyDescriptionChangedEvent": {} + }, + "count": 2 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection succeeded", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "serverHost": { + "$$type": "string" + }, + "serverPort": { + "$$type": [ + "int", + "long" + ] + } + } + } + ] + } + ] + }, + { + "description": "Failure due to unreachable server", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "failPoint", + "object": "testRunner", + "arguments": { + "client": "failPointClient", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "hello", + "ismaster" + ], + "appName": "loggingClient", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "client": "client", + "event": { + "serverDescriptionChangedEvent": { + "newDescription": { + "type": "Unknown" + } + } + }, + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "x": 1 + } + }, + "expectError": { + "isClientError": true + } + } + ], + "expectLogMessages": [ + { + "client": "client", + "messages": [ + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection started", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + } + } + }, + { + "level": "info", + "component": "serverSelection", + "data": { + "message": "Waiting for suitable server to become available", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "remainingTimeMS": { + "$$type": [ + "int", + "long" + ] + } + } + }, + { + "level": "debug", + "component": "serverSelection", + "data": { + "message": "Server selection failed", + "selector": { + "$$exists": true + }, + "operation": "insert", + "topologyDescription": { + "$$exists": true + }, + "failure": { + "$$exists": true + } + } + } + ] + } + ] + } + ] +} From cbf661ac918572306dbf899f8fe238caa14c6b64 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Fri, 29 Sep 2023 12:36:21 -0600 Subject: [PATCH 02/10] Add `ServerSelectionLoggingTest`, implement schema-1.14.json JAVA-4754 --- .../mongodb/internal/logging/LogMessage.java | 3 +- .../connection/TestClusterListener.java | 65 +++++++++++++++-- .../connection/TestServerListener.java | 69 +++++++++---------- .../unified/ServerSelectionLoggingTest.java | 40 +++++++++++ .../client/unified/ContextElement.java | 10 ++- .../com/mongodb/client/unified/Entities.java | 17 ++++- .../mongodb/client/unified/EventMatcher.java | 55 +++++++++++++-- .../unified/ServerSelectionLoggingTest.java | 40 +++++++++++ .../mongodb/client/unified/UnifiedTest.java | 8 +++ 9 files changed, 253 insertions(+), 54 deletions(-) create mode 100644 driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ServerSelectionLoggingTest.java create mode 100644 driver-sync/src/test/functional/com/mongodb/client/unified/ServerSelectionLoggingTest.java diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index 00426a11ba2..6fe787bdc65 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -42,7 +42,8 @@ public final class LogMessage { public enum Component { COMMAND, - CONNECTION + CONNECTION, + SERVERSELECTION } public enum Level { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java index 93cdbac7ad0..b9e56183a55 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java @@ -20,17 +20,28 @@ import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.ClusterListener; import com.mongodb.event.ClusterOpeningEvent; +import com.mongodb.lang.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.Locks.withLock; -class TestClusterListener implements ClusterListener { - private ClusterOpeningEvent clusterOpeningEvent; - private ClusterClosedEvent clusterClosingEvent; - private final List clusterDescriptionChangedEvents = new ArrayList<>(); +public final class TestClusterListener implements ClusterListener { + @Nullable + private volatile ClusterOpeningEvent clusterOpeningEvent; + @Nullable + private volatile ClusterClosedEvent clusterClosingEvent; + private final ArrayList clusterDescriptionChangedEvents = new ArrayList<>(); + private final ReentrantLock lock = new ReentrantLock(); + private final Condition newClusterDescriptionChangedEventCondition = lock.newCondition(); @Override public void clusterOpening(final ClusterOpeningEvent event) { @@ -47,22 +58,62 @@ public void clusterClosed(final ClusterClosedEvent event) { @Override public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) { notNull("event", event); - clusterDescriptionChangedEvents.add(event); + withLock(lock, () -> { + clusterDescriptionChangedEvents.add(event); + newClusterDescriptionChangedEventCondition.signalAll(); + }); } + @Nullable public ClusterOpeningEvent getClusterOpeningEvent() { return clusterOpeningEvent; } + @Nullable public ClusterClosedEvent getClusterClosingEvent() { return clusterClosingEvent; } public List getClusterDescriptionChangedEvents() { - return clusterDescriptionChangedEvents; + return withLock(lock, () -> new ArrayList<>(clusterDescriptionChangedEvents)); } + /** + * Calling this method concurrently with {@link #waitForClusterDescriptionChangedEvents(Predicate, int, Duration)}, + * may result in {@link #waitForClusterDescriptionChangedEvents(Predicate, int, Duration)} not working as expected. + */ public void clearClusterDescriptionChangedEvents() { - clusterDescriptionChangedEvents.clear(); + withLock(lock, clusterDescriptionChangedEvents::clear); + } + + /** + * Calling this method concurrently with {@link #clearClusterDescriptionChangedEvents()}, + * may result in {@link #waitForClusterDescriptionChangedEvents(Predicate, int, Duration)} not working as expected. + */ + public void waitForClusterDescriptionChangedEvents( + final Predicate matcher, final int count, final Duration duration) + throws InterruptedException, TimeoutException { + long nanosRemaining = duration.toNanos(); + lock.lockInterruptibly(); + try { + long observedCount = unguardedCount(matcher); + while (observedCount < count) { + if (nanosRemaining <= 0) { + throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d.", + count, ClusterDescriptionChangedEvent.class.getSimpleName(), observedCount)); + } + nanosRemaining = newClusterDescriptionChangedEventCondition.awaitNanos(nanosRemaining); + observedCount = unguardedCount(matcher); + } + } finally { + lock.unlock(); + } + } + + /** + * Must be guarded by {@link #lock}. + */ + private long unguardedCount(final Predicate matcher) { + return clusterDescriptionChangedEvents.stream().filter(matcher).count(); } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java index b565aeac969..dd180d5e176 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java @@ -16,14 +16,16 @@ package com.mongodb.internal.connection; +import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.ServerClosedEvent; import com.mongodb.event.ServerDescriptionChangedEvent; import com.mongodb.event.ServerListener; import com.mongodb.event.ServerOpeningEvent; +import com.mongodb.lang.Nullable; +import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -31,15 +33,16 @@ import java.util.function.Predicate; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.Locks.withLock; public class TestServerListener implements ServerListener { - private ServerOpeningEvent serverOpeningEvent; - private ServerClosedEvent serverClosedEvent; + @Nullable + private volatile ServerOpeningEvent serverOpeningEvent; + @Nullable + private volatile ServerClosedEvent serverClosedEvent; private final List serverDescriptionChangedEvents = new ArrayList<>(); private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); - private volatile int waitingForEventCount; - private Predicate waitingForEventMatcher; @Override public void serverOpening(final ServerOpeningEvent event) { @@ -54,61 +57,53 @@ public void serverClosed(final ServerClosedEvent event) { @Override public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) { notNull("event", event); - lock.lock(); - try { + withLock(lock, () -> { serverDescriptionChangedEvents.add(event); - if (waitingForEventCount != 0 && containsEvents()) { - condition.signalAll(); - } - - } finally { - lock.unlock(); - } + condition.signalAll(); + }); } + @Nullable public ServerOpeningEvent getServerOpeningEvent() { return serverOpeningEvent; } + @Nullable public ServerClosedEvent getServerClosedEvent() { return serverClosedEvent; } public List getServerDescriptionChangedEvents() { - return serverDescriptionChangedEvents; + return withLock(lock, () -> new ArrayList<>(serverDescriptionChangedEvents)); } - public void waitForServerDescriptionChangedEvent(final Predicate matcher, final int count, - final int time, final TimeUnit unit) throws InterruptedException, TimeoutException { + public void waitForServerDescriptionChangedEvents( + final Predicate matcher, final int count, final Duration duration) + throws InterruptedException, TimeoutException { if (count <= 0) { throw new IllegalArgumentException(); } - lock.lock(); + long nanosRemaining = duration.toNanos(); + lock.lockInterruptibly(); try { - if (waitingForEventCount != 0) { - throw new IllegalStateException("Already waiting for events"); - } - waitingForEventCount = count; - waitingForEventMatcher = matcher; - if (containsEvents()) { - return; - } - if (!condition.await(time, unit)) { - throw new TimeoutException("Timed out waiting for " + count + " ServerDescriptionChangedEvent events. " - + "The count after timing out is " + countEvents()); + long observedCount = unguardedCount(matcher); + while (observedCount < count) { + if (nanosRemaining <= 0) { + throw new TimeoutException(String.format("Timed out waiting for %d %s events. The observed count is %d.", + count, ClusterDescriptionChangedEvent.class.getSimpleName(), observedCount)); + } + nanosRemaining = condition.awaitNanos(nanosRemaining); + observedCount = unguardedCount(matcher); } } finally { - waitingForEventCount = 0; - waitingForEventMatcher = null; lock.unlock(); } } - private long countEvents() { - return serverDescriptionChangedEvents.stream().filter(waitingForEventMatcher).count(); - } - - private boolean containsEvents() { - return countEvents() >= waitingForEventCount; + /** + * Must be guarded by {@link #lock}. + */ + private long unguardedCount(final Predicate matcher) { + return serverDescriptionChangedEvents.stream().filter(matcher).count(); } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ServerSelectionLoggingTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ServerSelectionLoggingTest.java new file mode 100644 index 00000000000..433329def96 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ServerSelectionLoggingTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client.unified; + +import com.mongodb.lang.Nullable; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collection; + +public final class ServerSelectionLoggingTest extends UnifiedReactiveStreamsTest { + public ServerSelectionLoggingTest(@SuppressWarnings("unused") final String fileDescription, + @SuppressWarnings("unused") final String testDescription, + final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities, + final BsonArray initialData, final BsonDocument definition) { + super(schemaVersion, runOnRequirements, entities, initialData, definition); + } + + @Parameterized.Parameters(name = "{0}: {1}") + public static Collection data() throws URISyntaxException, IOException { + return getTestData("unified-test-format/server-selection/logging"); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java index 91b87ab482b..110cf321f4e 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ContextElement.java @@ -115,10 +115,18 @@ public static ContextElement ofWaitForServerDescriptionChangedEvents(final Strin return new EventCountContext("Wait For Server Description Changed Events", client, event, count); } - public static ContextElement ofServerDescriptionChangeEventCount(final String client, final BsonDocument event, final int count) { + public static ContextElement ofServerDescriptionChangedEventCount(final String client, final BsonDocument event, final int count) { return new EventCountContext("Server Description Changed Event Count", client, event, count); } + public static ContextElement ofWaitForClusterDescriptionChangedEvents(final String client, final BsonDocument event, final int count) { + return new EventCountContext("Wait For Cluster Description Changed Events", client, event, count); + } + + public static ContextElement ofClusterDescriptionChangedEventCount(final String client, final BsonDocument event, final int count) { + return new EventCountContext("Cluster Description Changed Event Count", client, event, count); + } + private static class EventCountContext extends ContextElement { private final String name; diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java index 3bc32e26c26..f62157e75b5 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java @@ -23,6 +23,7 @@ import com.mongodb.ReadConcernLevel; import com.mongodb.ServerApi; import com.mongodb.ServerApiVersion; +import com.mongodb.internal.connection.TestClusterListener; import com.mongodb.logging.TestLoggingInterceptor; import com.mongodb.TransactionOptions; import com.mongodb.WriteConcern; @@ -118,6 +119,7 @@ public final class Entities { private final Map clientLoggingInterceptors = new HashMap<>(); private final Map clientConnectionPoolListeners = new HashMap<>(); private final Map clientServerListeners = new HashMap<>(); + private final Map clientClusterListeners = new HashMap<>(); private final Map> cursors = new HashMap<>(); private final Map topologyDescriptions = new HashMap<>(); private final Map successCounts = new HashMap<>(); @@ -278,6 +280,10 @@ public TestServerListener getServerListener(final String id) { return getEntity(id + "-server-listener", clientServerListeners, "server listener"); } + public TestClusterListener getClusterListener(final String id) { + return getEntity(id + "-cluster-listener", clientClusterListeners, "cluster listener"); + } + private T getEntity(final String id, final Map entities, final String type) { T entity = entities.get(id); if (entity == null) { @@ -361,9 +367,13 @@ private void initClient(final BsonDocument entity, final String id, clientSettingsBuilder.applicationName(id); clientSettingsBuilder.applyToLoggerSettings(builder -> builder.maxDocumentLength(10_000)); - TestServerListener testClusterListener = new TestServerListener(); - clientSettingsBuilder.applyToServerSettings(builder -> builder.addServerListener(testClusterListener)); - putEntity(id + "-server-listener", testClusterListener, clientServerListeners); + TestServerListener testServerListener = new TestServerListener(); + clientSettingsBuilder.applyToServerSettings(builder -> builder.addServerListener(testServerListener)); + putEntity(id + "-server-listener", testServerListener, clientServerListeners); + + TestClusterListener testClusterListener = new TestClusterListener(); + clientSettingsBuilder.applyToClusterSettings(builder -> builder.addClusterListener(testClusterListener)); + putEntity(id + "-cluster-listener", testClusterListener, clientClusterListeners); if (entity.containsKey("observeEvents")) { List ignoreCommandMonitoringEvents = entity @@ -485,6 +495,7 @@ private void initClient(final BsonDocument entity, final String id, } break; case "appname": + case "appName": clientSettingsBuilder.applicationName(value.asString().getValue()); break; default: diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java index 1cc97e52b5c..777791aafaf 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/EventMatcher.java @@ -17,6 +17,7 @@ package com.mongodb.client.unified; import com.mongodb.connection.ServerType; +import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; import com.mongodb.event.CommandStartedEvent; @@ -28,6 +29,7 @@ import com.mongodb.event.ConnectionPoolReadyEvent; import com.mongodb.event.ConnectionReadyEvent; import com.mongodb.event.ServerDescriptionChangedEvent; +import com.mongodb.internal.connection.TestClusterListener; import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.internal.connection.TestServerListener; import com.mongodb.lang.NonNull; @@ -35,10 +37,14 @@ import org.bson.BsonDocument; import org.bson.types.ObjectId; +import java.time.Duration; +import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -235,8 +241,8 @@ public void waitForServerDescriptionChangedEvents(final String client, final Bso context.push(ContextElement.ofWaitForServerDescriptionChangedEvents(client, expectedEvent, count)); BsonDocument expectedEventContents = getEventContents(expectedEvent); try { - serverListener.waitForServerDescriptionChangedEvent( - event -> serverDescriptionChangedEventMatches(expectedEventContents, event), count, 10, TimeUnit.SECONDS); + serverListener.waitForServerDescriptionChangedEvents( + event -> serverDescriptionChangedEventMatches(expectedEventContents, event), count, Duration.ofSeconds(10)); context.pop(); } catch (InterruptedException e) { throw new RuntimeException(e); @@ -248,22 +254,52 @@ public void waitForServerDescriptionChangedEvents(final String client, final Bso public void assertServerDescriptionChangeEventCount(final String client, final BsonDocument expectedEvent, final int count, final List events) { BsonDocument expectedEventContents = getEventContents(expectedEvent); - context.push(ContextElement.ofServerDescriptionChangeEventCount(client, expectedEvent, count)); + context.push(ContextElement.ofServerDescriptionChangedEventCount(client, expectedEvent, count)); long matchCount = events.stream().filter(event -> serverDescriptionChangedEventMatches(expectedEventContents, event)).count(); assertEquals(context.getMessage("Expected server description changed event counts to match"), count, matchCount); context.pop(); } + public void waitForClusterDescriptionChangedEvents(final String client, final BsonDocument expectedEvent, final int count, + final TestClusterListener clusterListener) { + context.push(ContextElement.ofWaitForClusterDescriptionChangedEvents(client, expectedEvent, count)); + BsonDocument expectedEventContents = getEventContents(expectedEvent); + try { + clusterListener.waitForClusterDescriptionChangedEvents( + event -> clusterDescriptionChangedEventMatches(expectedEventContents, event), count, Duration.ofSeconds(10)); + context.pop(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + fail(context.getMessage("Timed out waiting for cluster description changed events")); + } + } + + public void assertClusterDescriptionChangeEventCount(final String client, final BsonDocument expectedEvent, final int count, + final List events) { + BsonDocument expectedEventContents = getEventContents(expectedEvent); + context.push(ContextElement.ofClusterDescriptionChangedEventCount(client, expectedEvent, count)); + long matchCount = events.stream().filter(event -> clusterDescriptionChangedEventMatches(expectedEventContents, event)).count(); + assertEquals(context.getMessage("Expected cluster description changed event counts to match"), count, matchCount); + context.pop(); + } + @NonNull private BsonDocument getEventContents(final BsonDocument expectedEvent) { - if (!expectedEvent.getFirstKey().equals("serverDescriptionChangedEvent")) { - throw new UnsupportedOperationException("Unsupported event type " + expectedEvent.getFirstKey()); + HashSet supportedEventTypes = new HashSet<>(asList("serverDescriptionChangedEvent", "topologyDescriptionChangedEvent")); + String expectedEventType = expectedEvent.getFirstKey(); + if (!supportedEventTypes.contains(expectedEventType)) { + throw new UnsupportedOperationException("Unsupported event type " + expectedEventType); } @SuppressWarnings("OptionalGetWithoutIsPresent") BsonDocument expectedEventContents = expectedEvent.values().stream().findFirst().get().asDocument(); if (expectedEventContents.isEmpty()) { return expectedEventContents; } + HashSet emptyEventTypes = new HashSet<>(singleton("topologyDescriptionChangedEvent")); + if (emptyEventTypes.contains(expectedEventType)) { + throw new UnsupportedOperationException("Contents of " + expectedEventType + " must be empty"); + } if (expectedEventContents.size() != 1 || !expectedEventContents.getFirstKey().equals("newDescription") || expectedEventContents.getDocument("newDescription").size() != 1) { throw new UnsupportedOperationException("Unsupported event contents " + expectedEvent); @@ -286,6 +322,15 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e } } + private static boolean clusterDescriptionChangedEventMatches(final BsonDocument expectedEventContents, + final ClusterDescriptionChangedEvent event) { + if (!expectedEventContents.isEmpty()) { + throw new UnsupportedOperationException( + "Contents of " + ClusterDescriptionChangedEvent.class.getSimpleName() + " must be empty"); + } + return true; + } + private static String getEventType(final Class eventClass) { String eventClassName = eventClass.getSimpleName(); if (eventClassName.startsWith("ConnectionPool")) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/ServerSelectionLoggingTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/ServerSelectionLoggingTest.java new file mode 100644 index 00000000000..4ce99fb76a8 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/ServerSelectionLoggingTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client.unified; + +import com.mongodb.lang.Nullable; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collection; + +public final class ServerSelectionLoggingTest extends UnifiedSyncTest { + public ServerSelectionLoggingTest(@SuppressWarnings("unused") final String fileDescription, + @SuppressWarnings("unused") final String testDescription, + final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities, + final BsonArray initialData, final BsonDocument definition) { + super(schemaVersion, runOnRequirements, entities, initialData, definition); + } + + @Parameterized.Parameters(name = "{0}: {1}") + public static Collection data() throws URISyntaxException, IOException { + return getTestData("unified-test-format/server-selection/logging"); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 0ae4b644975..49c94f486cd 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -578,6 +578,10 @@ private OperationResult executeWaitForEvent(final UnifiedTestContext context, fi context.getEventMatcher().waitForServerDescriptionChangedEvents(clientId, event, count, entities.getServerListener(clientId)); break; + case "topologyDescriptionChangedEvent": + context.getEventMatcher().waitForClusterDescriptionChangedEvents(clientId, event, count, + entities.getClusterListener(clientId)); + break; case "poolClearedEvent": case "poolReadyEvent": case "connectionCreatedEvent": @@ -603,6 +607,10 @@ private OperationResult executeAssertEventCount(final UnifiedTestContext context context.getEventMatcher().assertServerDescriptionChangeEventCount(clientId, event, count, entities.getServerListener(clientId).getServerDescriptionChangedEvents()); break; + case "topologyDescriptionChangedEvent": + context.getEventMatcher().assertClusterDescriptionChangeEventCount(clientId, event, count, + entities.getClusterListener(clientId).getClusterDescriptionChangedEvents()); + break; case "poolClearedEvent": case "poolReadyEvent": context.getEventMatcher().assertConnectionPoolEventCount(clientId, event, count, From 51c1220f401708d27120450349aa64ef7bfe5285 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Tue, 10 Oct 2023 14:25:07 -0600 Subject: [PATCH 03/10] Implement server selection logging JAVA-4754 --- .../internal/connection/BaseCluster.java | 208 +++++++++++++----- .../mongodb/internal/logging/LogMessage.java | 7 +- .../internal/session/ServerSessionPool.java | 38 +++- .../mongodb/client/unified/LogMatcher.java | 1 + 4 files changed, 185 insertions(+), 69 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index c66b5b8ead1..def785e30a2 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -17,9 +17,11 @@ package com.mongodb.internal.connection; import com.mongodb.MongoClientException; +import com.mongodb.MongoException; import com.mongodb.MongoIncompatibleDriverException; import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; +import com.mongodb.UnixServerAddress; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterId; import com.mongodb.connection.ClusterSettings; @@ -33,6 +35,9 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; +import com.mongodb.internal.logging.LogMessage; +import com.mongodb.internal.logging.LogMessage.Entry; +import com.mongodb.internal.logging.StructuredLogger; import com.mongodb.internal.selector.LatencyMinimizingServerSelector; import com.mongodb.lang.Nullable; import com.mongodb.selector.CompositeServerSelector; @@ -59,6 +64,16 @@ import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents; import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener; +import static com.mongodb.internal.logging.LogMessage.Component.SERVERSELECTION; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION_ID; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.REMAINING_TIME_MS; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SELECTOR; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_HOST; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT; +import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_DESCRIPTION; +import static com.mongodb.internal.logging.LogMessage.Level.DEBUG; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static java.util.Arrays.asList; @@ -67,8 +82,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; abstract class BaseCluster implements Cluster { - private static final Logger LOGGER = Loggers.getLogger("cluster"); + private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("cluster"); private final ReentrantLock lock = new ReentrantLock(); private final AtomicReference phase = new AtomicReference<>(new CountDownLatch(1)); @@ -105,34 +120,43 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera try { CountDownLatch currentPhase = phase.get(); ClusterDescription curDescription = description; + // VAKOTODO think of simplifying the toString of driver selectors + logServerSelectionStarted(operationContext, serverSelector, curDescription); ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector); ServerTuple serverTuple = selectServer(compositeServerSelector, curDescription); - boolean selectionFailureLogged = false; + boolean selectionWaitingLogged = false; long startTimeNanos = System.nanoTime(); long curTimeNanos = startTimeNanos; - long maxWaitTimeNanos = getMaxWaitTimeNanos(); + Long maxWaitTimeNanos = getMaxWaitTimeNanos(); while (true) { - throwIfIncompatible(curDescription); + if (!curDescription.isCompatibleWithDriver()) { + throw createAndLogIncompatibleException(operationContext, serverSelector, curDescription); + } if (serverTuple != null) { + logServerSelectionSucceeded(operationContext, serverTuple.getServerDescription().getAddress(), serverSelector, curDescription); return serverTuple; } - if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) { - throw createTimeoutException(serverSelector, curDescription); + Long remainingTimeNanos = maxWaitTimeNanos == null ? null : maxWaitTimeNanos - (curTimeNanos - startTimeNanos); + + if (remainingTimeNanos != null && remainingTimeNanos <= 0) { + throw createAndLogTimeoutException(operationContext, serverSelector, curDescription); } - if (!selectionFailureLogged) { - logServerSelectionFailure(serverSelector, curDescription); - selectionFailureLogged = true; + if (!selectionWaitingLogged) { + logServerSelectionWaiting(operationContext, remainingTimeNanos, serverSelector, curDescription); + selectionWaitingLogged = true; } connect(); - currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), getMinWaitTimeNanos()), NANOSECONDS); + currentPhase.await( + remainingTimeNanos == null ? getMinWaitTimeNanos() : Math.min(remainingTimeNanos, getMinWaitTimeNanos()), + NANOSECONDS); curTimeNanos = System.nanoTime(); @@ -151,15 +175,13 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati final SingleResultCallback callback) { isTrue("open", !isClosed()); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(format("Asynchronously selecting server with selector %s", serverSelector)); - } - ServerSelectionRequest request = new ServerSelectionRequest(serverSelector, getCompositeServerSelector(serverSelector), - getMaxWaitTimeNanos(), callback); - CountDownLatch currentPhase = phase.get(); ClusterDescription currentDescription = description; + logServerSelectionStarted(operationContext, serverSelector, currentDescription); + ServerSelectionRequest request = new ServerSelectionRequest(operationContext, serverSelector, getCompositeServerSelector(serverSelector), + getMaxWaitTimeNanos(), callback); + if (!handleServerSelectionRequest(request, currentPhase, currentDescription)) { notifyWaitQueueHandler(request); } @@ -230,9 +252,10 @@ private void updatePhase() { withLock(() -> phase.getAndSet(new CountDownLatch(1)).countDown()); } - private long getMaxWaitTimeNanos() { + @Nullable + private Long getMaxWaitTimeNanos() { if (settings.getServerSelectionTimeout(NANOSECONDS) < 0) { - return Long.MAX_VALUE; + return null; } return settings.getServerSelectionTimeout(NANOSECONDS); } @@ -248,31 +271,24 @@ private boolean handleServerSelectionRequest(final ServerSelectionRequest reques CountDownLatch prevPhase = request.phase; request.phase = currentPhase; if (!description.isCompatibleWithDriver()) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Asynchronously failed server selection due to driver incompatibility with server"); - } - request.onResult(null, createIncompatibleException(description)); + request.onResult(null, createAndLogIncompatibleException(request.operationContext, request.originalSelector, description)); return true; } ServerTuple serverTuple = selectServer(request.compositeSelector, description); if (serverTuple != null) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace(format("Asynchronously selected server %s", serverTuple.getServerDescription().getAddress())); - } + logServerSelectionSucceeded( + request.operationContext, serverTuple.getServerDescription().getAddress(), request.originalSelector, description); request.onResult(serverTuple, null); return true; } if (prevPhase == null) { - logServerSelectionFailure(request.originalSelector, description); + logServerSelectionWaiting(request.operationContext, request.getRemainingTime(), request.originalSelector, description); } } if (request.timedOut()) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Asynchronously failed server selection after timeout"); - } - request.onResult(null, createTimeoutException(request.originalSelector, description)); + request.onResult(null, createAndLogTimeoutException(request.operationContext, request.originalSelector, description)); return true; } @@ -283,18 +299,6 @@ private boolean handleServerSelectionRequest(final ServerSelectionRequest reques } } - private void logServerSelectionFailure(final ServerSelector serverSelector, final ClusterDescription curDescription) { - if (LOGGER.isInfoEnabled()) { - if (settings.getServerSelectionTimeout(MILLISECONDS) < 0) { - LOGGER.info(format("No server chosen by %s from cluster description %s. Waiting indefinitely.", - serverSelector, curDescription)); - } else { - LOGGER.info(format("No server chosen by %s from cluster description %s. Waiting for %d ms before timing out", - serverSelector, curDescription, settings.getServerSelectionTimeout(MILLISECONDS))); - } - } - } - @Nullable private ServerTuple selectServer(final ServerSelector serverSelector, final ClusterDescription clusterDescription) { @@ -351,10 +355,13 @@ protected ClusterableServer createServer(final ServerAddress serverAddress) { return serverFactory.create(this, serverAddress); } - private void throwIfIncompatible(final ClusterDescription curDescription) { - if (!curDescription.isCompatibleWithDriver()) { - throw createIncompatibleException(curDescription); - } + private MongoIncompatibleDriverException createAndLogIncompatibleException( + final OperationContext operationContext, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + MongoIncompatibleDriverException exception = createIncompatibleException(clusterDescription); + logServerSelectionFailed(operationContext, exception, serverSelector, clusterDescription); + return exception; } private MongoIncompatibleDriverException createIncompatibleException(final ClusterDescription curDescription) { @@ -376,24 +383,31 @@ private MongoIncompatibleDriverException createIncompatibleException(final Clust return new MongoIncompatibleDriverException(message, curDescription); } - private MongoTimeoutException createTimeoutException(final ServerSelector serverSelector, final ClusterDescription curDescription) { - return new MongoTimeoutException(format("Timed out after %d ms while waiting for a server that matches %s. " - + "Client view of cluster state is %s", - settings.getServerSelectionTimeout(MILLISECONDS), serverSelector, - curDescription.getShortDescription())); + private MongoException createAndLogTimeoutException( + final OperationContext operationContext, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + MongoTimeoutException exception = new MongoTimeoutException("Timed out while waiting for a suitable server"); + logServerSelectionFailed(operationContext, exception, serverSelector, clusterDescription); + return exception; } private static final class ServerSelectionRequest { + private final OperationContext operationContext; private final ServerSelector originalSelector; private final ServerSelector compositeSelector; - private final long maxWaitTimeNanos; + @Nullable + private final Long maxWaitTimeNanos; private final SingleResultCallback callback; private final long startTimeNanos = System.nanoTime(); private CountDownLatch phase; - ServerSelectionRequest(final ServerSelector serverSelector, final ServerSelector compositeSelector, - final long maxWaitTimeNanos, + ServerSelectionRequest(final OperationContext operationContext, + final ServerSelector serverSelector, final ServerSelector compositeSelector, + @Nullable + final Long maxWaitTimeNanos, final SingleResultCallback callback) { + this.operationContext = operationContext; this.originalSelector = serverSelector; this.compositeSelector = compositeSelector; this.maxWaitTimeNanos = maxWaitTimeNanos; @@ -409,11 +423,13 @@ void onResult(@Nullable final ServerTuple serverTuple, @Nullable final Throwable } boolean timedOut() { - return System.nanoTime() - startTimeNanos > maxWaitTimeNanos; + Long remainingTimeNanos = getRemainingTime(); + return remainingTimeNanos != null && remainingTimeNanos <= 0; } - long getRemainingTime() { - return startTimeNanos + maxWaitTimeNanos - System.nanoTime(); + @Nullable + Long getRemainingTime() { + return maxWaitTimeNanos == null ? null : maxWaitTimeNanos - (System.nanoTime() - startTimeNanos); } } @@ -455,7 +471,9 @@ public void run() { if (handleServerSelectionRequest(nextRequest, currentPhase, curDescription)) { iter.remove(); } else { - waitTimeNanos = Math.min(nextRequest.getRemainingTime(), Math.min(getMinWaitTimeNanos(), waitTimeNanos)); + Long remainingTimeNanos = nextRequest.getRemainingTime(); + long minWaitTimeNanos = Math.min(getMinWaitTimeNanos(), waitTimeNanos); + waitTimeNanos = remainingTimeNanos == null ? minWaitTimeNanos : Math.min(remainingTimeNanos, minWaitTimeNanos); } } @@ -477,4 +495,78 @@ public void run() { } } } + + private void logServerSelectionStarted( + final OperationContext operationContext, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + STRUCTURED_LOGGER.log(new LogMessage( + SERVERSELECTION, DEBUG, "Server selection started", clusterId, + asList( + new Entry(OPERATION, "VAKOTODO"), + new Entry(OPERATION_ID, operationContext.getId()), + new Entry(SELECTOR, serverSelector.toString()), + new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), + "Server selection started for operation {} with ID {}. Selector: {}, topology description: {}")); + } + } + + private void logServerSelectionWaiting( + final OperationContext operationContext, + @Nullable + final Long remainingTimeNanos, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + STRUCTURED_LOGGER.log(new LogMessage( + SERVERSELECTION, DEBUG, "Waiting for suitable server to become available", clusterId, + asList( + new Entry(OPERATION, "VAKOTODO"), + new Entry(OPERATION_ID, operationContext.getId()), + new Entry(REMAINING_TIME_MS, remainingTimeNanos == null ? null : NANOSECONDS.toMillis(remainingTimeNanos)), + new Entry(SELECTOR, serverSelector.toString()), + new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), + "Waiting for server to become available for operation {} with ID {}.[ Remaining time: {} ms.]" + + " Selector: {}, topology description: {}.")); + } + } + + private void logServerSelectionFailed( + final OperationContext operationContext, + final MongoException failure, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + STRUCTURED_LOGGER.log(new LogMessage( + SERVERSELECTION, DEBUG, "Server selection failed", clusterId, + asList( + new Entry(OPERATION, "VAKOTODO"), + new Entry(OPERATION_ID, operationContext.getId()), + new Entry(FAILURE, failure.toString()), + new Entry(SELECTOR, serverSelector.toString()), + new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), + "Server selection failed for operation {} with ID {}. Failure: {}. Selector: {}, topology description: {}")); + } + } + + private void logServerSelectionSucceeded( + final OperationContext operationContext, + final ServerAddress serverAddress, + final ServerSelector serverSelector, + final ClusterDescription clusterDescription) { + if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + STRUCTURED_LOGGER.log(new LogMessage( + SERVERSELECTION, DEBUG, "Server selection succeeded", clusterId, + asList( + new Entry(OPERATION, "VAKOTODO"), + new Entry(OPERATION_ID, operationContext.getId()), + new Entry(SERVER_HOST, serverAddress.getHost()), + new Entry(SERVER_PORT, serverAddress instanceof UnixServerAddress ? null : serverAddress.getPort()), + new Entry(SELECTOR, serverSelector.toString()), + new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), + "Server selection succeeded for operation {} with ID {}. Selected server: {}[:{}]." + + " Selector: {}, topology description: {}")); + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index 6fe787bdc65..554ea6b357e 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -74,6 +74,7 @@ public enum Name { COMMAND_NAME("commandName"), REQUEST_ID("requestId"), OPERATION_ID("operationId"), + OPERATION("operation"), SERVICE_ID("serviceId"), SERVER_CONNECTION_ID("serverConnectionId"), DRIVER_CONNECTION_ID("driverConnectionId"), @@ -83,11 +84,15 @@ public enum Name { COMMAND_CONTENT("command"), REASON_DESCRIPTION("reason"), ERROR_DESCRIPTION("error"), + FAILURE("failure"), MAX_IDLE_TIME_MS("maxIdleTimeMS"), MIN_POOL_SIZE("minPoolSize"), MAX_POOL_SIZE("maxPoolSize"), MAX_CONNECTING("maxConnecting"), - WAIT_QUEUE_TIMEOUT_MS("waitQueueTimeoutMS"); + WAIT_QUEUE_TIMEOUT_MS("waitQueueTimeoutMS"), + SELECTOR("selector"), + TOPOLOGY_DESCRIPTION("topologyDescription"), + REMAINING_TIME_MS("remainingTimeMS"); private final String value; diff --git a/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java b/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java index 64114658cc3..35268e68f13 100644 --- a/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java +++ b/driver-core/src/main/com/mongodb/internal/session/ServerSessionPool.java @@ -19,6 +19,7 @@ import com.mongodb.MongoException; import com.mongodb.ReadPreference; import com.mongodb.ServerApi; +import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ServerDescription; import com.mongodb.internal.IgnorableRequestContext; import com.mongodb.internal.binding.StaticBindingContext; @@ -29,6 +30,7 @@ import com.mongodb.internal.selector.ReadPreferenceServerSelector; import com.mongodb.internal.validator.NoOpFieldNameValidator; import com.mongodb.lang.Nullable; +import com.mongodb.selector.ServerSelector; import com.mongodb.session.ServerSession; import org.bson.BsonArray; import org.bson.BsonBinary; @@ -114,9 +116,13 @@ private void endClosedSessions() { return; } - List primaryPreferred = new ReadPreferenceServerSelector(ReadPreference.primaryPreferred()) + ReadPreference primaryPreferred = ReadPreference.primaryPreferred(); + List primaryPreferredServers = new ReadPreferenceServerSelector(primaryPreferred) .select(cluster.getCurrentDescription()); - if (primaryPreferred.isEmpty()) { + if (primaryPreferredServers.isEmpty()) { + // Skip doing server selection if we anticipate that no server is readily selectable. + // This approach is racy, and it is still possible to become blocked selecting a server + // even if `primaryPreferredServers` is not empty. return; } @@ -124,14 +130,26 @@ private void endClosedSessions() { try { StaticBindingContext context = new StaticBindingContext(NoOpSessionContext.INSTANCE, serverApi, IgnorableRequestContext.INSTANCE, new OperationContext()); - connection = cluster.selectServer(clusterDescription -> { - for (ServerDescription cur : clusterDescription.getServerDescriptions()) { - if (cur.getAddress().equals(primaryPreferred.get(0).getAddress())) { - return Collections.singletonList(cur); - } - } - return Collections.emptyList(); - }, context.getOperationContext()).getServer().getConnection(context.getOperationContext()); + connection = cluster.selectServer( + new ServerSelector() { + @Override + public List select(final ClusterDescription clusterDescription) { + for (ServerDescription cur : clusterDescription.getServerDescriptions()) { + if (cur.getAddress().equals(primaryPreferredServers.get(0).getAddress())) { + return Collections.singletonList(cur); + } + } + return Collections.emptyList(); + } + + @Override + public String toString() { + return "ReadPreferenceServerSelector{" + + "readPreference=" + primaryPreferred + + '}'; + } + }, + context.getOperationContext()).getServer().getConnection(context.getOperationContext()); connection.command("admin", new BsonDocument("endSessions", new BsonArray(identifiers)), new NoOpFieldNameValidator(), diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java index b18f7151e51..9c74956758f 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java @@ -58,6 +58,7 @@ void assertLogMessageEquality(final String client, final BsonArray expectedMessa static BsonDocument asDocument(final LogMessage message) { BsonDocument document = new BsonDocument(); + // VAKOTODO map "serverSelection" -> "cluster" here? document.put("component", new BsonString(message.getComponent().name().toLowerCase())); document.put("level", new BsonString(message.getLevel().name().toLowerCase())); document.put("hasFailure", BsonBoolean.valueOf(message.getException() != null)); From f2dc65e077bc21c7987cccdb389579685284ebb2 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Wed, 11 Oct 2023 10:12:50 -0600 Subject: [PATCH 04/10] Do fixes JAVA-4754 --- .../internal/connection/BaseCluster.java | 34 +++++++-------- .../mongodb/internal/logging/LogMessage.java | 34 +++++++++++++-- .../internal/logging/StructuredLogger.java | 42 +++++++++++++------ .../com/mongodb/client/unified/Entities.java | 3 +- .../mongodb/client/unified/LogMatcher.java | 8 +++- 5 files changed, 85 insertions(+), 36 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index def785e30a2..beda584aa12 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -64,7 +64,7 @@ import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents; import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener; -import static com.mongodb.internal.logging.LogMessage.Component.SERVERSELECTION; +import static com.mongodb.internal.logging.LogMessage.Component.SERVER_SELECTION; import static com.mongodb.internal.logging.LogMessage.Entry.Name.FAILURE; import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION; import static com.mongodb.internal.logging.LogMessage.Entry.Name.OPERATION_ID; @@ -74,6 +74,7 @@ import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVER_PORT; import static com.mongodb.internal.logging.LogMessage.Entry.Name.TOPOLOGY_DESCRIPTION; import static com.mongodb.internal.logging.LogMessage.Level.DEBUG; +import static com.mongodb.internal.logging.LogMessage.Level.INFO; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static java.util.Arrays.asList; @@ -120,7 +121,6 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera try { CountDownLatch currentPhase = phase.get(); ClusterDescription curDescription = description; - // VAKOTODO think of simplifying the toString of driver selectors logServerSelectionStarted(operationContext, serverSelector, curDescription); ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector); ServerTuple serverTuple = selectServer(compositeServerSelector, curDescription); @@ -502,13 +502,13 @@ private void logServerSelectionStarted( final ClusterDescription clusterDescription) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( - SERVERSELECTION, DEBUG, "Server selection started", clusterId, + SERVER_SELECTION, DEBUG, "Server selection started", clusterId, asList( - new Entry(OPERATION, "VAKOTODO"), + new Entry(OPERATION, null), new Entry(OPERATION_ID, operationContext.getId()), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), - "Server selection started for operation {} with ID {}. Selector: {}, topology description: {}")); + "Server selection started for operation[ {}] with ID {}. Selector: {}, topology description: {}")); } } @@ -518,17 +518,17 @@ private void logServerSelectionWaiting( final Long remainingTimeNanos, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { - if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + if (STRUCTURED_LOGGER.isRequired(INFO, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( - SERVERSELECTION, DEBUG, "Waiting for suitable server to become available", clusterId, + SERVER_SELECTION, INFO, "Waiting for suitable server to become available", clusterId, asList( - new Entry(OPERATION, "VAKOTODO"), + new Entry(OPERATION, null), new Entry(OPERATION_ID, operationContext.getId()), new Entry(REMAINING_TIME_MS, remainingTimeNanos == null ? null : NANOSECONDS.toMillis(remainingTimeNanos)), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), - "Waiting for server to become available for operation {} with ID {}.[ Remaining time: {} ms.]" + - " Selector: {}, topology description: {}.")); + "Waiting for server to become available for operation[ {}] with ID {}.[ Remaining time: {} ms.]" + + " Selector: {}, topology description: {}.")); } } @@ -539,14 +539,14 @@ private void logServerSelectionFailed( final ClusterDescription clusterDescription) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( - SERVERSELECTION, DEBUG, "Server selection failed", clusterId, + SERVER_SELECTION, DEBUG, "Server selection failed", clusterId, asList( - new Entry(OPERATION, "VAKOTODO"), + new Entry(OPERATION, null), new Entry(OPERATION_ID, operationContext.getId()), new Entry(FAILURE, failure.toString()), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), - "Server selection failed for operation {} with ID {}. Failure: {}. Selector: {}, topology description: {}")); + "Server selection failed for operation[ {}] with ID {}. Failure: {}. Selector: {}, topology description: {}")); } } @@ -557,16 +557,16 @@ private void logServerSelectionSucceeded( final ClusterDescription clusterDescription) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( - SERVERSELECTION, DEBUG, "Server selection succeeded", clusterId, + SERVER_SELECTION, DEBUG, "Server selection succeeded", clusterId, asList( - new Entry(OPERATION, "VAKOTODO"), + new Entry(OPERATION, null), new Entry(OPERATION_ID, operationContext.getId()), new Entry(SERVER_HOST, serverAddress.getHost()), new Entry(SERVER_PORT, serverAddress instanceof UnixServerAddress ? null : serverAddress.getPort()), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), - "Server selection succeeded for operation {} with ID {}. Selected server: {}[:{}]." + - " Selector: {}, topology description: {}")); + "Server selection succeeded for operation[ {}] with ID {}. Selected server: {}[:{}]." + + " Selector: {}, topology description: {}")); } } } diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index 554ea6b357e..d726c7c78fb 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -22,10 +22,13 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.mongodb.assertions.Assertions.assertNotNull; +import static java.util.function.Function.identity; /** *

This class is not part of the public API and may be removed or changed at any time

@@ -41,12 +44,34 @@ public final class LogMessage { private final String format; public enum Component { - COMMAND, - CONNECTION, - SERVERSELECTION + COMMAND("command"), + CONNECTION("connection"), + SERVER_SELECTION("serverSelection"); + + private static final Map INDEX; + + static { + INDEX = Stream.of(Component.values()).collect(Collectors.toMap(Component::getValue, identity())); + } + + private final String value; + + Component(final String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static Component of(final String value) { + Component result = INDEX.get(value); + return assertNotNull(result); + } } public enum Level { + INFO, DEBUG } @@ -74,6 +99,9 @@ public enum Name { COMMAND_NAME("commandName"), REQUEST_ID("requestId"), OPERATION_ID("operationId"), + /** + * Not supported. + */ OPERATION("operation"), SERVICE_ID("serviceId"), SERVER_CONNECTION_ID("serverConnectionId"), diff --git a/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java b/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java index d6bd8deb5ba..fea60cbecb1 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java +++ b/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java @@ -24,6 +24,8 @@ import com.mongodb.lang.Nullable; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.function.Predicate; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; @@ -68,10 +70,11 @@ public boolean isRequired(final Level level, final ClusterId clusterId) { return true; } - //noinspection SwitchStatementWithTooFewBranches switch (level) { case DEBUG: return logger.isDebugEnabled(); + case INFO: + return logger.isInfoEnabled(); default: throw new UnsupportedOperationException(); } @@ -82,22 +85,37 @@ public void log(final LogMessage logMessage) { if (interceptor != null) { interceptor.intercept(logMessage); } - //noinspection SwitchStatementWithTooFewBranches switch (logMessage.getLevel()) { case DEBUG: - if (logger.isDebugEnabled()) { - LogMessage.UnstructuredLogMessage unstructuredLogMessage = logMessage.toUnstructuredLogMessage(); - String message = unstructuredLogMessage.interpolate(); - Throwable exception = logMessage.getException(); - if (exception == null) { - logger.debug(message); - } else { - logger.debug(message, exception); - } - } + logUnstructured(logMessage, Logger::isDebugEnabled, Logger::debug, Logger::debug); + break; + case INFO: + logUnstructured(logMessage, Logger::isInfoEnabled, Logger::info, Logger::info); break; default: throw new UnsupportedOperationException(); } } + + private void logUnstructured( + final LogMessage logMessage, + final Predicate loggingEnabled, + final BiConsumer doLog, + final TriConsumer doLogWithException) { + if (loggingEnabled.test(logger)) { + LogMessage.UnstructuredLogMessage unstructuredLogMessage = logMessage.toUnstructuredLogMessage(); + String message = unstructuredLogMessage.interpolate(); + Throwable exception = logMessage.getException(); + if (exception == null) { + doLog.accept(logger, message); + } else { + doLogWithException.accept(logger, message, exception); + } + } + } + + @FunctionalInterface + private interface TriConsumer { + void accept(A a, B b, C c); + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java index f62157e75b5..ab72c27179a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java @@ -537,8 +537,7 @@ private void initClient(final BsonDocument entity, final String id, private static LogMessage.Component toComponent(final Map.Entry entry) { String componentName = entry.getKey(); - return LogMessage.Component - .valueOf(componentName.toUpperCase()); + return LogMessage.Component.of(componentName); } private static LogMessage.Level toLevel(final Map.Entry entry) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java index 9c74956758f..de8bfaea452 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/LogMatcher.java @@ -50,6 +50,11 @@ void assertLogMessageEquality(final String client, final BsonArray expectedMessa for (int i = 0; i < expectedMessages.size(); i++) { BsonDocument expectedMessageAsDocument = expectedMessages.get(i).asDocument().clone(); + // `LogMessage.Entry.Name.OPERATION` is not supported, therefore we skip matching its value + BsonValue expectedDataDocument = expectedMessageAsDocument.get("data"); + if (expectedDataDocument != null) { + expectedDataDocument.asDocument().remove(LogMessage.Entry.Name.OPERATION.getValue()); + } valueMatcher.assertValuesMatch(expectedMessageAsDocument, asDocument(actualMessages.get(i))); } @@ -58,8 +63,7 @@ void assertLogMessageEquality(final String client, final BsonArray expectedMessa static BsonDocument asDocument(final LogMessage message) { BsonDocument document = new BsonDocument(); - // VAKOTODO map "serverSelection" -> "cluster" here? - document.put("component", new BsonString(message.getComponent().name().toLowerCase())); + document.put("component", new BsonString(message.getComponent().getValue())); document.put("level", new BsonString(message.getLevel().name().toLowerCase())); document.put("hasFailure", BsonBoolean.valueOf(message.getException() != null)); document.put("failureIsRedacted", From b1292b930dbacc67756e3070d84c87365538909f Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Wed, 11 Oct 2023 14:38:17 -0600 Subject: [PATCH 05/10] Do fixes JAVA-4754 --- .../BaseClusterSpecification.groovy | 8 +--- .../client/unified/UnifiedCrudHelper.java | 43 ++++++++++++++++--- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy index 39c52b23821..aabde9e8c46 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy @@ -171,7 +171,7 @@ class BaseClusterSpecification extends Specification { .serverDescription.address == firstServer } - def 'should timeout with useful message'() { + def 'should timeout with message containing neither the selector nor the topology'() { given: def cluster = new MultiServerCluster(new ClusterId(), builder().mode(MULTIPLE) @@ -191,11 +191,7 @@ class BaseClusterSpecification extends Specification { then: def e = thrown(MongoTimeoutException) - e.getMessage().startsWith("Timed out after ${serverSelectionTimeoutMS} ms while waiting for a server " + - 'that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN') - e.getMessage().contains('{address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' + - 'exception={com.mongodb.MongoInternalException: oops}}') - e.getMessage().contains('{address=localhost:27018, type=UNKNOWN, state=CONNECTING}') + e.getMessage().equals('Timed out while waiting for a suitable server') where: serverSelectionTimeoutMS << [1, 0] diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java index 97f24422a11..8ef8d1c1b2e 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java @@ -22,6 +22,8 @@ import com.mongodb.ReadPreference; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; +import com.mongodb.Tag; +import com.mongodb.TagSet; import com.mongodb.TransactionOptions; import com.mongodb.WriteConcern; import com.mongodb.bulk.BulkWriteResult; @@ -90,7 +92,6 @@ import org.bson.codecs.configuration.CodecRegistries; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -100,6 +101,8 @@ import java.util.function.Supplier; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -136,14 +139,40 @@ static WriteConcern asWriteConcern(final BsonDocument writeConcernDocument) { } public static ReadPreference asReadPreference(final BsonDocument readPreferenceDocument) { + List supportedKeys = asList("mode", "tagSets", "maxStalenessSeconds"); + List unsupportedKeys = readPreferenceDocument.keySet().stream().filter(key -> !supportedKeys.contains(key)).collect(toList()); + if (!unsupportedKeys.isEmpty()) { + throw new UnsupportedOperationException("Unsupported read preference keys: " + unsupportedKeys + " in " + readPreferenceDocument); + } + String mode = readPreferenceDocument.getString("mode").getValue(); if (readPreferenceDocument.size() == 1) { - return ReadPreference.valueOf(readPreferenceDocument.getString("mode").getValue()); - } else if (readPreferenceDocument.size() == 2) { - return ReadPreference.valueOf(readPreferenceDocument.getString("mode").getValue(), - Collections.emptyList(), readPreferenceDocument.getNumber("maxStalenessSeconds").intValue(), TimeUnit.SECONDS); - } else { - throw new UnsupportedOperationException("Unsupported read preference properties: " + readPreferenceDocument.toJson()); + return ReadPreference.valueOf(mode); } + List tagSets = tagSets(readPreferenceDocument.getArray("tagSets", new BsonArray())); + BsonValue maxStalenessSecondsBson = readPreferenceDocument.get("maxStalenessSeconds"); + Integer maxStalenessSeconds = maxStalenessSecondsBson == null ? null : maxStalenessSecondsBson.asInt32().intValue(); + if (maxStalenessSecondsBson == null) { + return ReadPreference.valueOf(mode, tagSets); + } + return ReadPreference.valueOf(mode, tagSets, maxStalenessSeconds, TimeUnit.SECONDS); + } + + private static List tagSets(final BsonArray tagSets) { + List tags = tagSets.stream() + .map(tagBson -> { + if (!tagBson.isDocument()) { + throw new UnsupportedOperationException("Expected a document, but got " + tagBson.getBsonType() + + ": " + tagBson); + } + BsonDocument tagDoc = tagBson.asDocument(); + if (tagDoc.size() != 1) { + throw new UnsupportedOperationException("Expected a document with a single field, but got " + tagDoc); + } + String tagName = tagDoc.getFirstKey(); + return new Tag(tagName, tagDoc.getString(tagName).getValue()); + }) + .collect(toList()); + return tags.isEmpty() ? emptyList() : singletonList(new TagSet(tags)); } private OperationResult resultOf(final Supplier operationResult) { From f4a401326fb6f26b070989fee5b873fea9fa8e97 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Wed, 11 Oct 2023 16:34:31 -0600 Subject: [PATCH 06/10] Do fixes JAVA-4754 --- .../client/unified/UnifiedCrudHelper.java | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java index 8ef8d1c1b2e..5ffc4862c7a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java @@ -101,8 +101,6 @@ import java.util.function.Supplier; import static java.util.Arrays.asList; -import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; @@ -157,22 +155,14 @@ public static ReadPreference asReadPreference(final BsonDocument readPreferenceD return ReadPreference.valueOf(mode, tagSets, maxStalenessSeconds, TimeUnit.SECONDS); } - private static List tagSets(final BsonArray tagSets) { - List tags = tagSets.stream() - .map(tagBson -> { - if (!tagBson.isDocument()) { - throw new UnsupportedOperationException("Expected a document, but got " + tagBson.getBsonType() - + ": " + tagBson); - } - BsonDocument tagDoc = tagBson.asDocument(); - if (tagDoc.size() != 1) { - throw new UnsupportedOperationException("Expected a document with a single field, but got " + tagDoc); - } - String tagName = tagDoc.getFirstKey(); - return new Tag(tagName, tagDoc.getString(tagName).getValue()); - }) + private static List tagSets(final BsonArray tagSetsBson) { + return tagSetsBson.stream() + .map(tagSetBson -> new TagSet(tagSetBson.asDocument() + .entrySet() + .stream() + .map(entry -> new Tag(entry.getKey(), entry.getValue().asString().getValue())) + .collect(toList()))) .collect(toList()); - return tags.isEmpty() ? emptyList() : singletonList(new TagSet(tags)); } private OperationResult resultOf(final Supplier operationResult) { From ed343864165b948696d9ceec8b34489d1eeee788 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Fri, 13 Oct 2023 10:00:16 -0600 Subject: [PATCH 07/10] Update locking to match the decision made in https://jira.mongodb.org/browse/JAVA-5189 JAVA-4754 --- .../com/mongodb/internal/connection/TestClusterListener.java | 2 +- .../com/mongodb/internal/connection/TestServerListener.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java index b9e56183a55..89ca0088a77 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterListener.java @@ -94,7 +94,7 @@ public void waitForClusterDescriptionChangedEvents( final Predicate matcher, final int count, final Duration duration) throws InterruptedException, TimeoutException { long nanosRemaining = duration.toNanos(); - lock.lockInterruptibly(); + lock.lock(); try { long observedCount = unguardedCount(matcher); while (observedCount < count) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java index dd180d5e176..007074f8cc6 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerListener.java @@ -84,7 +84,7 @@ public void waitForServerDescriptionChangedEvents( throw new IllegalArgumentException(); } long nanosRemaining = duration.toNanos(); - lock.lockInterruptibly(); + lock.lock(); try { long observedCount = unguardedCount(matcher); while (observedCount < count) { From e09930f00bfe8b010ee7b364e2d4947a7c93a0d2 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Thu, 19 Oct 2023 11:36:44 -0600 Subject: [PATCH 08/10] Address review concerns JAVA-4754 --- .../mongodb/internal/logging/LogMessage.java | 4 +++ .../internal/logging/StructuredLogger.java | 26 ++++++++----------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index d726c7c78fb..214e58b9d59 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -17,6 +17,7 @@ package com.mongodb.internal.logging; import com.mongodb.connection.ClusterId; +import com.mongodb.internal.VisibleForTesting; import com.mongodb.lang.Nullable; import java.util.Collection; @@ -28,6 +29,7 @@ import java.util.stream.Stream; import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static java.util.function.Function.identity; /** @@ -60,10 +62,12 @@ public enum Component { this.value = value; } + @VisibleForTesting(otherwise = PRIVATE) public String getValue() { return value; } + @VisibleForTesting(otherwise = PRIVATE) public static Component of(final String value) { Component result = INDEX.get(value); return assertNotNull(result); diff --git a/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java b/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java index fea60cbecb1..d65a80ef230 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java +++ b/driver-core/src/main/com/mongodb/internal/logging/StructuredLogger.java @@ -25,7 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; -import java.util.function.Predicate; +import java.util.function.Consumer; +import java.util.function.Supplier; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; @@ -87,35 +88,30 @@ public void log(final LogMessage logMessage) { } switch (logMessage.getLevel()) { case DEBUG: - logUnstructured(logMessage, Logger::isDebugEnabled, Logger::debug, Logger::debug); + logUnstructured(logMessage, logger::isDebugEnabled, logger::debug, logger::debug); break; case INFO: - logUnstructured(logMessage, Logger::isInfoEnabled, Logger::info, Logger::info); + logUnstructured(logMessage, logger::isInfoEnabled, logger::info, logger::info); break; default: throw new UnsupportedOperationException(); } } - private void logUnstructured( + private static void logUnstructured( final LogMessage logMessage, - final Predicate loggingEnabled, - final BiConsumer doLog, - final TriConsumer doLogWithException) { - if (loggingEnabled.test(logger)) { + final Supplier loggingEnabled, + final Consumer doLog, + final BiConsumer doLogWithException) { + if (loggingEnabled.get()) { LogMessage.UnstructuredLogMessage unstructuredLogMessage = logMessage.toUnstructuredLogMessage(); String message = unstructuredLogMessage.interpolate(); Throwable exception = logMessage.getException(); if (exception == null) { - doLog.accept(logger, message); + doLog.accept(message); } else { - doLogWithException.accept(logger, message, exception); + doLogWithException.accept(message, exception); } } } - - @FunctionalInterface - private interface TriConsumer { - void accept(A a, B b, C c); - } } From 7998d1a2f711dc3bb92307b40687b6bd9338aa78 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Wed, 1 Nov 2023 00:28:08 -0600 Subject: [PATCH 09/10] Partially restore the `MongoTimeoutException` message JAVA-4754 --- .../com/mongodb/internal/connection/BaseCluster.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index beda584aa12..7ad6addbd3c 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -387,7 +387,9 @@ private MongoException createAndLogTimeoutException( final OperationContext operationContext, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { - MongoTimeoutException exception = new MongoTimeoutException("Timed out while waiting for a suitable server"); + MongoTimeoutException exception = new MongoTimeoutException(format( + "Timed out while waiting for a server that matches %s. Client view of cluster state is %s", + serverSelector, clusterDescription.getShortDescription())); logServerSelectionFailed(operationContext, exception, serverSelector, clusterDescription); return exception; } @@ -538,12 +540,18 @@ private void logServerSelectionFailed( final ServerSelector serverSelector, final ClusterDescription clusterDescription) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { + String failureDescription = failure instanceof MongoTimeoutException + // This hardcoded message guarantees that the `FAILURE` entry for `MongoTimeoutException` does not include + // any information that is specified via other entries, e.g., `SELECTOR` and `TOPOLOGY_DESCRIPTION`. + // The logging spec requires us to avoid such duplication of information. + ? MongoTimeoutException.class.getName() + ": Timed out while waiting for a suitable server" + : failure.toString(); STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection failed", clusterId, asList( new Entry(OPERATION, null), new Entry(OPERATION_ID, operationContext.getId()), - new Entry(FAILURE, failure.toString()), + new Entry(FAILURE, failureDescription), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), "Server selection failed for operation[ {}] with ID {}. Failure: {}. Selector: {}, topology description: {}")); From ef5cd3a44e47374bbbbc14e9c44b5f9f524e0f14 Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Wed, 1 Nov 2023 00:54:06 -0600 Subject: [PATCH 10/10] Fix `BaseClusterSpecification` JAVA-4754 --- .../internal/connection/BaseClusterSpecification.groovy | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy index aabde9e8c46..c7428d2f4e7 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy @@ -171,7 +171,7 @@ class BaseClusterSpecification extends Specification { .serverDescription.address == firstServer } - def 'should timeout with message containing neither the selector nor the topology'() { + def 'should timeout with useful message'() { given: def cluster = new MultiServerCluster(new ClusterId(), builder().mode(MULTIPLE) @@ -191,7 +191,11 @@ class BaseClusterSpecification extends Specification { then: def e = thrown(MongoTimeoutException) - e.getMessage().equals('Timed out while waiting for a suitable server') + e.getMessage().startsWith("Timed out while waiting for a server " + + 'that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN') + e.getMessage().contains('{address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' + + 'exception={com.mongodb.MongoInternalException: oops}}') + e.getMessage().contains('{address=localhost:27018, type=UNKNOWN, state=CONNECTING}') where: serverSelectionTimeoutMS << [1, 0]