This class is not part of the public API and may be removed or changed at any time
*/
public interface AsyncAggregateResponseBatchCursor extends AsyncBatchCursor {
+ @Nullable
BsonDocument getPostBatchResumeToken();
+ @Nullable
BsonTimestamp getOperationTime();
boolean isFirstBatchEmpty();
diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java b/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java
index 14cc3faa71b..89260ac7b52 100644
--- a/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java
@@ -16,6 +16,8 @@
package com.mongodb.internal.async;
+import com.mongodb.internal.operation.BatchCursor;
+
import java.io.Closeable;
import java.util.List;
@@ -28,9 +30,9 @@
*/
public interface AsyncBatchCursor extends Closeable {
/**
- * Returns the next batch of results. A tailable cursor will block until another batch exists. After the last batch, the next call
- * to this method will execute the callback with a null result to indicate that there are no more batches available and the cursor
- * has been closed.
+ * Returns the next batch of results. A tailable cursor will block until another batch exists.
+ * Unlike the {@link BatchCursor} this method will automatically mark the cursor as closed when there are no more expected results.
+ * Care should be taken to check {@link #isClosed()} between calls.
*
* @param callback callback to receive the next batch of results
* @throws java.util.NoSuchElementException if no next batch exists
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
index 4379845bdd1..d70c7cbac66 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
@@ -18,13 +18,11 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
-import com.mongodb.connection.ConnectionDescription;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.client.model.AggregationLevel;
-import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
@@ -40,7 +38,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
-import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
@@ -48,7 +45,6 @@
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
-import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
@@ -239,25 +235,16 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe
return commandDocument;
}
- private QueryResult createQueryResult(final BsonDocument result, final ConnectionDescription description) {
- assertNotNull(result);
- return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress());
- }
-
- private CommandReadTransformer> transformer() {
- return (result, source, connection) -> {
- QueryResult queryResult = createQueryResult(result, connection.getDescription());
- return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
- source, connection, result);
- };
+ private CommandReadTransformer> transformer() {
+ return (result, source, connection) ->
+ new CommandBatchCursor<>(connection.getDescription().getServerAddress(), result, 0, batchSize != null ? batchSize : 0,
+ maxAwaitTimeMS, decoder, comment, source, connection);
}
private CommandReadTransformerAsync> asyncTransformer() {
- return (result, source, connection) -> {
- QueryResult queryResult = createQueryResult(result, connection.getDescription());
- return new AsyncQueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
- comment, source, connection, result);
- };
+ return (result, source, connection) ->
+ new AsyncCommandBatchCursor<>(connection.getDescription().getServerAddress(), result, 0, batchSize != null ? batchSize : 0,
+ maxAwaitTimeMS, decoder, comment, source, connection);
}
interface AggregateTarget {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateResponseBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateResponseBatchCursor.java
index 5ec7d00bb26..e12a2249123 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AggregateResponseBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateResponseBatchCursor.java
@@ -17,6 +17,7 @@
package com.mongodb.internal.operation;
import com.mongodb.annotations.NotThreadSafe;
+import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
@@ -27,8 +28,10 @@
*/
@NotThreadSafe
public interface AggregateResponseBatchCursor extends BatchCursor {
+ @Nullable
BsonDocument getPostBatchResumeToken();
+ @Nullable
BsonTimestamp getOperationTime();
boolean isFirstBatchEmpty();
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java
index 9ccd2f17b0a..1d271107a9a 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java
@@ -60,8 +60,7 @@ final class AsyncChangeStreamBatchCursor implements AsyncAggregateResponseBat
final int maxWireVersion) {
this.changeStreamOperation = changeStreamOperation;
this.wrapped = new AtomicReference<>(assertNotNull(wrapped));
- this.binding = binding;
- binding.retain();
+ this.binding = binding.retain();
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
isClosed = new AtomicBoolean();
@@ -164,8 +163,8 @@ public int getMaxWireVersion() {
return maxWireVersion;
}
- private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor queryBatchCursor) {
- BsonDocument resumeToken = queryBatchCursor.getPostBatchResumeToken();
+ private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor commandBatchCursor) {
+ BsonDocument resumeToken = commandBatchCursor.getPostBatchResumeToken();
if (resumeToken != null) {
this.resumeToken = resumeToken;
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
new file mode 100644
index 00000000000..d10a1f9705e
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java
@@ -0,0 +1,339 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.MongoSocketException;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+import com.mongodb.ServerCursor;
+import com.mongodb.annotations.ThreadSafe;
+import com.mongodb.connection.ServerType;
+import com.mongodb.internal.VisibleForTesting;
+import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
+import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.async.function.AsyncCallbackSupplier;
+import com.mongodb.internal.binding.AsyncConnectionSource;
+import com.mongodb.internal.connection.AsyncConnection;
+import com.mongodb.internal.connection.Connection;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
+import org.bson.codecs.BsonDocumentCodec;
+import org.bson.codecs.Decoder;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.assertions.Assertions.isTrueArgument;
+import static com.mongodb.assertions.Assertions.notNull;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CONCURRENT_OPERATION;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.NO_OP_FIELD_NAME_VALIDATOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
+import static com.mongodb.internal.operation.OperationHelper.LOGGER;
+import static java.lang.String.format;
+import static java.util.Collections.emptyList;
+
+class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor {
+
+ private final MongoNamespace namespace;
+ private final int limit;
+ private final Decoder decoder;
+ private final long maxTimeMS;
+ @Nullable
+ private final BsonValue comment;
+
+ private final AtomicInteger count = new AtomicInteger();
+ private final AtomicBoolean processedInitial = new AtomicBoolean();
+ private final int maxWireVersion;
+ private final boolean firstBatchEmpty;
+ private final ResourceManager resourceManager;
+ private volatile CommandCursorResult commandCursorResult;
+ private int batchSize;
+
+ AsyncCommandBatchCursor(
+ final ServerAddress serverAddress,
+ final BsonDocument commandCursorDocument,
+ final int limit, final int batchSize, final long maxTimeMS,
+ final Decoder decoder,
+ @Nullable final BsonValue comment,
+ final AsyncConnectionSource connectionSource,
+ final AsyncConnection connection) {
+ isTrueArgument("maxTimeMS >= 0", maxTimeMS >= 0);
+ CommandCursorResult commandCursor = initFromCommandCursorDocument(serverAddress, FIRST_BATCH, commandCursorDocument);
+ this.namespace = commandCursor.getNamespace();
+ this.limit = limit;
+ this.batchSize = batchSize;
+ this.maxTimeMS = maxTimeMS;
+ this.decoder = notNull("decoder", decoder);
+ this.comment = comment;
+ this.maxWireVersion = connection.getDescription().getMaxWireVersion();
+ this.firstBatchEmpty = commandCursor.getResults().isEmpty();
+
+ AsyncConnection connectionToPin = null;
+ boolean releaseServerAndResources = false;
+ if (limitReached()) {
+ releaseServerAndResources = true;
+ } else if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) {
+ connectionToPin = connection;
+ }
+
+ resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursor.getServerCursor());
+ if (releaseServerAndResources) {
+ resourceManager.releaseServerAndClientResources(connection);
+ }
+ }
+
+ @Override
+ public void next(final SingleResultCallback> callback) {
+ if (isClosed()) {
+ callback.onResult(null, new MongoException(MESSAGE_IF_CLOSED_AS_CURSOR));
+ return;
+ } else if (!resourceManager.tryStartOperation()) {
+ callback.onResult(null, new MongoException(MESSAGE_IF_CONCURRENT_OPERATION));
+ return;
+ }
+
+ ServerCursor localServerCursor = resourceManager.getServerCursor();
+ boolean cursorClosed = localServerCursor == null;
+ List batchResults = emptyList();
+ if (!processedInitial.getAndSet(true) && !firstBatchEmpty) {
+ batchResults = commandCursorResult.getResults();
+ }
+
+ if (cursorClosed || !batchResults.isEmpty()) {
+ resourceManager.endOperation();
+ if (cursorClosed) {
+ close();
+ }
+ callback.onResult(batchResults, null);
+ } else {
+ getMore(localServerCursor, callback);
+ }
+ }
+
+ @Override
+ public void setBatchSize(final int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !resourceManager.operable();
+ }
+
+ @Override
+ public void close() {
+ resourceManager.close();
+ }
+
+ @Override
+ public BsonDocument getPostBatchResumeToken() {
+ return commandCursorResult.getPostBatchResumeToken();
+ }
+
+ @Override
+ public BsonTimestamp getOperationTime() {
+ return commandCursorResult.getOperationTime();
+ }
+
+ @Override
+ public boolean isFirstBatchEmpty() {
+ return firstBatchEmpty;
+ }
+
+ @Override
+ public int getMaxWireVersion() {
+ return maxWireVersion;
+ }
+
+ @Nullable
+ @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
+ ServerCursor getServerCursor() {
+ return resourceManager.getServerCursor();
+ }
+
+ private void getMore(final ServerCursor cursor, final SingleResultCallback> callback) {
+ resourceManager.executeWithConnection(connection -> getMore(connection, cursor, callback), callback);
+ }
+
+ private void getMore(final AsyncConnection connection, final ServerCursor cursor, final SingleResultCallback> callback) {
+ connection.commandAsync(namespace.getDatabaseName(),
+ getMoreCommandDocument(cursor.getId(), connection.getDescription(), namespace,
+ limit, batchSize, count.get(), maxTimeMS, comment),
+ NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(),
+ CommandResultDocumentCodec.create(decoder, NEXT_BATCH), assertNotNull(resourceManager.getConnectionSource()),
+ (commandResult, t) -> {
+ if (t != null) {
+ Throwable translatedException =
+ t instanceof MongoCommandException
+ ? translateCommandException((MongoCommandException) t, cursor)
+ : t;
+
+ connection.release();
+ resourceManager.endOperation();
+ callback.onResult(null, translatedException);
+ return;
+ }
+
+ CommandCursorResult commandCursor =
+ initFromCommandCursorDocument(connection.getDescription().getServerAddress(), NEXT_BATCH, assertNotNull(commandResult));
+ resourceManager.setServerCursor(commandCursor.getServerCursor());
+
+ if (!resourceManager.operable()) {
+ resourceManager.releaseServerAndClientResources(connection);
+ connection.release();
+ callback.onResult(emptyList(), null);
+ return;
+ }
+
+ if (commandCursor.getResults().isEmpty() && commandCursor.getServerCursor() != null) {
+ connection.release();
+ getMore(commandCursor.getServerCursor(), callback);
+ } else {
+ resourceManager.endOperation();
+ if (limitReached()) {
+ resourceManager.releaseServerAndClientResources(connection);
+ close();
+ }
+ connection.release();
+ callback.onResult(commandCursor.getResults(), null);
+ }
+ });
+ }
+
+ private CommandCursorResult initFromCommandCursorDocument(
+ final ServerAddress serverAddress,
+ final String fieldNameContainingBatch,
+ final BsonDocument commandCursorDocument) {
+ CommandCursorResult cursorResult = new CommandCursorResult<>(serverAddress, fieldNameContainingBatch, commandCursorDocument);
+ count.addAndGet(cursorResult.getResults().size());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", cursorResult.getResults().size(),
+ cursorResult.getCursorId(), cursorResult.getServerAddress()));
+ }
+ this.commandCursorResult = cursorResult;
+ return cursorResult;
+ }
+
+ private boolean limitReached() {
+ return Math.abs(limit) != 0 && count.get() >= Math.abs(limit);
+ }
+
+ @ThreadSafe
+ private static final class ResourceManager extends CursorResourceManager {
+
+ ResourceManager(
+ final MongoNamespace namespace,
+ final AsyncConnectionSource connectionSource,
+ @Nullable final AsyncConnection connectionToPin,
+ @Nullable final ServerCursor serverCursor) {
+ super(namespace, connectionSource, connectionToPin, serverCursor);
+ }
+
+ @Override
+ void markAsPinned(final AsyncConnection connectionToPin, final Connection.PinningMode pinningMode) {
+ connectionToPin.markAsPinned(pinningMode);
+ }
+
+ @Override
+ void executeWithConnection(final Consumer action) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ void executeWithConnection(final Consumer action, final SingleResultCallback callback) {
+ assertTrue(getState() != State.IDLE);
+ AsyncConnection pinnedConnection = getPinnedConnection();
+ if (pinnedConnection != null) {
+ executeWithConnection(assertNotNull(pinnedConnection).retain(), null, action, callback);
+ } else {
+ assertNotNull(getConnectionSource()).getConnection((conn, t) -> executeWithConnection(conn, t, action, callback));
+ }
+ }
+
+ void executeWithConnection(
+ @Nullable final AsyncConnection connection,
+ @Nullable final Throwable t,
+ final Consumer function,
+ final SingleResultCallback callback) {
+ assertTrue(connection != null || t != null);
+ if (t != null) {
+ callback.onResult(null, t);
+ } else {
+ AsyncCallbackSupplier curriedFunction = c -> function.accept(connection);
+ curriedFunction.whenComplete(connection::release).get((result, error) -> {
+ if (error instanceof MongoSocketException) {
+ onCorruptedConnection(connection);
+ }
+ connection.release();
+ callback.onResult(result, error);
+ });
+ }
+ }
+
+ @Override
+ void doClose() {
+ if (isSkipReleasingServerResourcesOnClose()) {
+ unsetServerCursor();
+ }
+
+ if (getServerCursor() != null) {
+ executeWithConnection(c -> {
+ releaseServerResources(c);
+ c.release();
+ }, (r, t) -> {
+ // guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
+ unsetServerCursor();
+ releaseClientResources();
+ });
+ } else {
+ releaseClientResources();
+ }
+ }
+
+ @Override
+ void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final AsyncConnection connection) {
+ connection.retain();
+ AsyncConnectionSource connectionSource = assertNotNull(getConnectionSource()).retain();
+ connection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, serverCursor),
+ NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(),
+ connectionSource, (r, t) -> {
+ connectionSource.release();
+ connection.release();
+ releaseClientResources();
+ });
+ }
+ }
+
+}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
index 21b10cdff08..b670083be78 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
@@ -18,9 +18,7 @@
import com.mongodb.Function;
import com.mongodb.MongoException;
-import com.mongodb.MongoNamespace;
import com.mongodb.ReadPreference;
-import com.mongodb.ServerAddress;
import com.mongodb.assertions.Assertions;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
@@ -35,7 +33,6 @@
import com.mongodb.internal.binding.ReferenceCounted;
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.OperationContext;
-import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.operation.retry.AttachmentKeys;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
@@ -56,7 +53,6 @@
import static com.mongodb.internal.operation.CommandOperationHelper.isRetryWritesEnabled;
import static com.mongodb.internal.operation.CommandOperationHelper.logRetryExecute;
import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException;
-import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError;
final class AsyncOperationHelper {
@@ -309,15 +305,15 @@ static CommandWriteTransformerAsync writeConcernErrorTransfo
};
}
- static AsyncBatchCursor createEmptyAsyncBatchCursor(final MongoNamespace namespace, final ServerAddress serverAddress) {
- return new AsyncSingleBatchQueryCursor<>(new QueryResult<>(namespace, Collections.emptyList(), 0L, serverAddress));
+ static CommandReadTransformerAsync> asyncSingleBatchCursorTransformer(final String resultsFieldName) {
+ return (result, source, connection) ->
+ new AsyncSingleBatchCursor<>(BsonDocumentWrapperHelper.toList(result, resultsFieldName), 0);
}
static AsyncBatchCursor cursorDocumentToAsyncBatchCursor(final BsonDocument cursorDocument, final Decoder decoder,
final BsonValue comment, final AsyncConnectionSource source, final AsyncConnection connection, final int batchSize) {
- return new AsyncQueryBatchCursor<>(cursorDocumentToQueryResult(cursorDocument,
- source.getServerDescription().getAddress()),
- 0, batchSize, 0, decoder, comment, source, connection, cursorDocument);
+ return new AsyncCommandBatchCursor<>(connection.getDescription().getServerAddress(), cursorDocument,
+ 0, batchSize, 0, decoder, comment, source, connection);
}
static SingleResultCallback releasingCallback(final SingleResultCallback wrapped, final AsyncConnection connection) {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java
deleted file mode 100644
index be079abec36..00000000000
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Copyright 2008-present MongoDB, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.mongodb.internal.operation;
-
-import com.mongodb.MongoCommandException;
-import com.mongodb.MongoException;
-import com.mongodb.MongoNamespace;
-import com.mongodb.ReadPreference;
-import com.mongodb.ServerCursor;
-import com.mongodb.connection.ConnectionDescription;
-import com.mongodb.connection.ServerType;
-import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
-import com.mongodb.internal.async.SingleResultCallback;
-import com.mongodb.internal.binding.AsyncConnectionSource;
-import com.mongodb.internal.connection.AsyncConnection;
-import com.mongodb.internal.connection.Connection;
-import com.mongodb.internal.connection.QueryResult;
-import com.mongodb.internal.diagnostics.logging.Logger;
-import com.mongodb.internal.diagnostics.logging.Loggers;
-import com.mongodb.internal.validator.NoOpFieldNameValidator;
-import com.mongodb.lang.Nullable;
-import org.bson.BsonArray;
-import org.bson.BsonDocument;
-import org.bson.BsonInt32;
-import org.bson.BsonInt64;
-import org.bson.BsonString;
-import org.bson.BsonTimestamp;
-import org.bson.BsonValue;
-import org.bson.FieldNameValidator;
-import org.bson.codecs.BsonDocumentCodec;
-import org.bson.codecs.Decoder;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static com.mongodb.assertions.Assertions.assertFalse;
-import static com.mongodb.assertions.Assertions.assertNotNull;
-import static com.mongodb.assertions.Assertions.isTrueArgument;
-import static com.mongodb.assertions.Assertions.notNull;
-import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
-import static com.mongodb.internal.operation.CursorHelper.getNumberToReturn;
-import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
-import static com.mongodb.internal.operation.SyncOperationHelper.getMoreCursorDocumentToQueryResult;
-import static com.mongodb.internal.operation.QueryHelper.translateCommandException;
-import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour;
-import static java.lang.String.format;
-import static java.util.Collections.singletonList;
-
-class AsyncQueryBatchCursor implements AsyncAggregateResponseBatchCursor {
- private static final Logger LOGGER = Loggers.getLogger("operation");
- private static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
- private static final String CURSOR = "cursor";
- private static final String POST_BATCH_RESUME_TOKEN = "postBatchResumeToken";
- private static final String OPERATION_TIME = "operationTime";
-
- private final MongoNamespace namespace;
- private final int limit;
- private final Decoder decoder;
- private final long maxTimeMS;
- private volatile AsyncConnectionSource connectionSource;
- private volatile AsyncConnection pinnedConnection;
- private final AtomicReference cursor;
- private volatile QueryResult firstBatch;
- private volatile int batchSize;
- private final AtomicInteger count = new AtomicInteger();
- private volatile BsonDocument postBatchResumeToken;
- private final BsonTimestamp operationTime;
- private final BsonValue comment;
- private final boolean firstBatchEmpty;
- private final int maxWireVersion;
-
- private final Lock lock = new ReentrantLock();
- /* protected by `lock` */
- private boolean isOperationInProgress = false;
- private boolean isClosed = false;
- /* protected by `lock` */
- private volatile boolean isClosePending = false;
-
- AsyncQueryBatchCursor(final QueryResult firstBatch, final int limit, final int batchSize, final long maxTimeMS,
- final Decoder decoder, final BsonValue comment, final AsyncConnectionSource connectionSource,
- final AsyncConnection connection) {
- this(firstBatch, limit, batchSize, maxTimeMS, decoder, comment, connectionSource, connection, null);
- }
-
- AsyncQueryBatchCursor(final QueryResult firstBatch, final int limit, final int batchSize, final long maxTimeMS,
- final Decoder decoder, final BsonValue comment, final AsyncConnectionSource connectionSource,
- @Nullable final AsyncConnection connection, @Nullable final BsonDocument result) {
- isTrueArgument("maxTimeMS >= 0", maxTimeMS >= 0);
- this.maxTimeMS = maxTimeMS;
- this.namespace = firstBatch.getNamespace();
- this.firstBatch = firstBatch;
- this.limit = limit;
- this.batchSize = batchSize;
- this.decoder = decoder;
- this.comment = comment;
- this.cursor = new AtomicReference<>(firstBatch.getCursor());
- this.count.addAndGet(firstBatch.getResults().size());
- if (result != null) {
- this.operationTime = result.getTimestamp(OPERATION_TIME, null);
- this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
- } else {
- this.operationTime = null;
- }
-
- firstBatchEmpty = firstBatch.getResults().isEmpty();
- if (cursor.get() != null) {
- this.connectionSource = notNull("connectionSource", connectionSource).retain();
- assertNotNull(connection);
- if (limitReached()) {
- killCursor(connection);
- } else {
- if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) {
- this.pinnedConnection = connection.retain();
- this.pinnedConnection.markAsPinned(Connection.PinningMode.CURSOR);
- }
- }
- }
- this.maxWireVersion = connection == null ? 0 : connection.getDescription().getMaxWireVersion();
- logQueryResult(firstBatch);
- }
-
- /**
- * {@inheritDoc}
- *
- * From the perspective of the code external to this class, this method is idempotent as required by its specification.
- * However, if this method sets {@link #isClosePending},
- * then it must be called by {@code this} again to release resources.
- * This behavior does not violate externally observable idempotence because this method is allowed to release resources "eventually".
- */
- @Override
- public void close() {
- boolean doClose = false;
-
- lock.lock();
- try {
- if (isOperationInProgress) {
- isClosePending = true;
- } else if (!isClosed) {
- isClosed = true;
- isClosePending = false;
- doClose = true;
- }
- } finally {
- lock.unlock();
- }
-
- if (doClose) {
- killCursorOnClose();
- }
- }
-
- @Override
- public void next(final SingleResultCallback> callback) {
- if (isClosed()) {
- callback.onResult(null, new MongoException("next() called after the cursor was closed."));
- } else if (firstBatch != null && (!firstBatch.getResults().isEmpty())) {
- // May be empty for a tailable cursor
- List results = firstBatch.getResults();
- firstBatch = null;
- if (getServerCursor() == null) {
- close();
- }
- callback.onResult(results, null);
- } else {
- ServerCursor localCursor = getServerCursor();
- if (localCursor == null) {
- close();
- callback.onResult(null, null);
- } else {
- lock.lock();
- try {
- if (isClosed()) {
- callback.onResult(null, new MongoException("next() called after the cursor was closed."));
- return;
- }
- isOperationInProgress = true;
- } finally {
- lock.unlock();
- }
- getMore(localCursor, callback);
- }
- }
- }
-
- @Override
- public void setBatchSize(final int batchSize) {
- assertFalse(isClosed());
- this.batchSize = batchSize;
- }
-
- @Override
- public int getBatchSize() {
- assertFalse(isClosed());
- return batchSize;
- }
-
- @Override
- public boolean isClosed() {
- lock.lock();
- try {
- return isClosed || isClosePending;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public BsonDocument getPostBatchResumeToken() {
- return postBatchResumeToken;
- }
-
- @Override
- public BsonTimestamp getOperationTime() {
- return operationTime;
- }
-
- @Override
- public boolean isFirstBatchEmpty() {
- return firstBatchEmpty;
- }
-
- @Override
- public int getMaxWireVersion() {
- return maxWireVersion;
- }
-
- private boolean limitReached() {
- return Math.abs(limit) != 0 && count.get() >= Math.abs(limit);
- }
-
- private void getMore(final ServerCursor cursor, final SingleResultCallback> callback) {
- if (pinnedConnection != null) {
- getMore(pinnedConnection.retain(), cursor, callback);
- } else {
- connectionSource.getConnection((connection, t) -> {
- if (t != null) {
- endOperationInProgress();
- callback.onResult(null, t);
- } else {
- getMore(assertNotNull(connection), cursor, callback);
- }
- });
- }
- }
-
- private void getMore(final AsyncConnection connection, final ServerCursor cursor, final SingleResultCallback> callback) {
- connection.commandAsync(namespace.getDatabaseName(), asGetMoreCommandDocument(cursor.getId(), connection.getDescription()),
- NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), CommandResultDocumentCodec.create(decoder, "nextBatch"),
- connectionSource, new CommandResultSingleResultCallback(connection, cursor, callback));
- }
-
- private BsonDocument asGetMoreCommandDocument(final long cursorId, final ConnectionDescription connectionDescription) {
- BsonDocument document = new BsonDocument("getMore", new BsonInt64(cursorId))
- .append("collection", new BsonString(namespace.getCollectionName()));
-
- int batchSizeForGetMoreCommand = Math.abs(getNumberToReturn(limit, this.batchSize, count.get()));
- if (batchSizeForGetMoreCommand != 0) {
- document.append("batchSize", new BsonInt32(batchSizeForGetMoreCommand));
- }
- if (maxTimeMS != 0) {
- document.append("maxTimeMS", new BsonInt64(maxTimeMS));
- }
- if (serverIsAtLeastVersionFourDotFour(connectionDescription)) {
- putIfNotNull(document, "comment", comment);
- }
- return document;
- }
-
- private void killCursorOnClose() {
- ServerCursor localCursor = getServerCursor();
- if (localCursor != null) {
- if (pinnedConnection != null) {
- killCursorAsynchronouslyAndReleaseConnectionAndSource(pinnedConnection, localCursor);
- } else {
- connectionSource.getConnection((connection, t) -> {
- if (t != null) {
- connectionSource.release();
- } else {
- killCursorAsynchronouslyAndReleaseConnectionAndSource(assertNotNull(connection), localCursor);
- }
- });
- }
- } else if (pinnedConnection != null) {
- pinnedConnection.release();
- }
- }
-
- private void killCursor(final AsyncConnection connection) {
- ServerCursor localCursor = cursor.getAndSet(null);
- if (localCursor != null) {
- killCursorAsynchronouslyAndReleaseConnectionAndSource(connection.retain(), localCursor);
- } else {
- connectionSource.release();
- }
- }
-
- private void killCursorAsynchronouslyAndReleaseConnectionAndSource(final AsyncConnection connection, final ServerCursor localCursor) {
- connection.commandAsync(namespace.getDatabaseName(), asKillCursorsCommandDocument(localCursor), NO_OP_FIELD_NAME_VALIDATOR,
- ReadPreference.primary(), new BsonDocumentCodec(), connectionSource, (result, t) -> {
- connection.release();
- connectionSource.release();
- });
- }
-
- private BsonDocument asKillCursorsCommandDocument(final ServerCursor localCursor) {
- return new BsonDocument("killCursors", new BsonString(namespace.getCollectionName()))
- .append("cursors", new BsonArray(singletonList(new BsonInt64(localCursor.getId()))));
- }
-
- private void endOperationInProgress() {
- boolean closePending;
- lock.lock();
- try {
- isOperationInProgress = false;
- closePending = this.isClosePending;
- } finally {
- lock.unlock();
- }
- if (closePending) {
- close();
- }
- }
-
-
- private void handleGetMoreQueryResult(final AsyncConnection connection, final SingleResultCallback> callback,
- final QueryResult result) {
- logQueryResult(result);
- cursor.set(result.getCursor());
- if (isClosePending) {
- try {
- connection.release();
- if (result.getCursor() == null) {
- connectionSource.release();
- }
- endOperationInProgress();
- } finally {
- callback.onResult(null, null);
- }
- } else if (result.getResults().isEmpty() && result.getCursor() != null) {
- getMore(connection, assertNotNull(result.getCursor()), callback);
- } else {
- count.addAndGet(result.getResults().size());
- if (limitReached()) {
- killCursor(connection);
- connection.release();
- } else {
- connection.release();
- if (result.getCursor() == null) {
- connectionSource.release();
- }
- }
- endOperationInProgress();
-
- if (result.getResults().isEmpty()) {
- callback.onResult(null, null);
- } else {
- callback.onResult(result.getResults(), null);
- }
- }
- }
-
- private void logQueryResult(final QueryResult result) {
- LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", result.getResults().size(),
- result.getCursorId(), result.getAddress()));
- }
-
- private class CommandResultSingleResultCallback implements SingleResultCallback {
- private final AsyncConnection connection;
- private final ServerCursor cursor;
- private final SingleResultCallback> callback;
-
- CommandResultSingleResultCallback(final AsyncConnection connection, final ServerCursor cursor,
- final SingleResultCallback> callback) {
- this.connection = connection;
- this.cursor = cursor;
- this.callback = errorHandlingCallback(callback, LOGGER);
- }
-
- @Override
- public void onResult(@Nullable final BsonDocument result, @Nullable final Throwable t) {
- if (t != null) {
- Throwable translatedException = t instanceof MongoCommandException
- ? translateCommandException((MongoCommandException) t, cursor)
- : t;
- connection.release();
- endOperationInProgress();
- callback.onResult(null, translatedException);
- } else {
- assertNotNull(result);
- QueryResult queryResult = getMoreCursorDocumentToQueryResult(result.getDocument(CURSOR),
- connection.getDescription().getServerAddress());
- postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
- handleGetMoreQueryResult(connection, callback, queryResult);
- }
- }
- }
-
- @Nullable
- ServerCursor getServerCursor() {
- return cursor.get();
- }
-
- @Nullable
- private BsonDocument getPostBatchResumeTokenFromResponse(final BsonDocument result) {
- BsonDocument cursor = result.getDocument(CURSOR, null);
- if (cursor != null) {
- return cursor.getDocument(POST_BATCH_RESUME_TOKEN, null);
- }
- return null;
- }
-}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchQueryCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchCursor.java
similarity index 63%
rename from driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchQueryCursor.java
rename to driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchCursor.java
index f29cda04dae..57b20ff1711 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchQueryCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchCursor.java
@@ -19,19 +19,26 @@
import com.mongodb.MongoException;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
-import com.mongodb.internal.connection.QueryResult;
import java.util.List;
-import static com.mongodb.assertions.Assertions.isTrue;
+import static java.util.Collections.emptyList;
-class AsyncSingleBatchQueryCursor implements AsyncBatchCursor {
- private volatile QueryResult firstBatch;
- private volatile boolean closed;
+class AsyncSingleBatchCursor implements AsyncBatchCursor {
- AsyncSingleBatchQueryCursor(final QueryResult firstBatch) {
- this.firstBatch = firstBatch;
- isTrue("Empty Cursor", firstBatch.getCursor() == null);
+ static AsyncSingleBatchCursor createEmptyAsyncSingleBatchCursor(final int batchSize) {
+ return new AsyncSingleBatchCursor<>(emptyList(), batchSize);
+ }
+
+ private final List batch;
+ private final int batchSize;
+
+ private volatile boolean hasNext = true;
+ private volatile boolean closed = false;
+
+ AsyncSingleBatchCursor(final List batch, final int batchSize) {
+ this.batch = batch;
+ this.batchSize = batchSize;
}
@Override
@@ -43,13 +50,12 @@ public void close() {
public void next(final SingleResultCallback> callback) {
if (closed) {
callback.onResult(null, new MongoException("next() called after the cursor was closed."));
- } else if (firstBatch != null && !firstBatch.getResults().isEmpty()) {
- List results = firstBatch.getResults();
- firstBatch = null;
- callback.onResult(results, null);
+ } else if (hasNext && !batch.isEmpty()) {
+ hasNext = false;
+ callback.onResult(batch, null);
} else {
closed = true;
- callback.onResult(null, null);
+ callback.onResult(emptyList(), null);
}
}
@@ -60,7 +66,7 @@ public void setBatchSize(final int batchSize) {
@Override
public int getBatchSize() {
- return 0;
+ return batchSize;
}
@Override
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
index acf70090457..a5d36f86f55 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
@@ -64,23 +64,23 @@ AggregateResponseBatchCursor getWrapped() {
@Override
public boolean hasNext() {
- return resumeableOperation(queryBatchCursor -> {
+ return resumeableOperation(commandBatchCursor -> {
try {
- return queryBatchCursor.hasNext();
+ return commandBatchCursor.hasNext();
} finally {
- cachePostBatchResumeToken(queryBatchCursor);
+ cachePostBatchResumeToken(commandBatchCursor);
}
});
}
@Override
public List next() {
- return resumeableOperation(queryBatchCursor -> {
+ return resumeableOperation(commandBatchCursor -> {
try {
- return convertAndProduceLastId(queryBatchCursor.next(), changeStreamOperation.getDecoder(),
+ return convertAndProduceLastId(commandBatchCursor.next(), changeStreamOperation.getDecoder(),
lastId -> resumeToken = lastId);
} finally {
- cachePostBatchResumeToken(queryBatchCursor);
+ cachePostBatchResumeToken(commandBatchCursor);
}
});
}
@@ -92,12 +92,12 @@ public int available() {
@Override
public List tryNext() {
- return resumeableOperation(queryBatchCursor -> {
+ return resumeableOperation(commandBatchCursor -> {
try {
- return convertAndProduceLastId(queryBatchCursor.tryNext(), changeStreamOperation.getDecoder(),
+ return convertAndProduceLastId(commandBatchCursor.tryNext(), changeStreamOperation.getDecoder(),
lastId -> resumeToken = lastId);
} finally {
- cachePostBatchResumeToken(queryBatchCursor);
+ cachePostBatchResumeToken(commandBatchCursor);
}
});
}
@@ -155,9 +155,9 @@ public int getMaxWireVersion() {
return maxWireVersion;
}
- private void cachePostBatchResumeToken(final AggregateResponseBatchCursor queryBatchCursor) {
- if (queryBatchCursor.getPostBatchResumeToken() != null) {
- resumeToken = queryBatchCursor.getPostBatchResumeToken();
+ private void cachePostBatchResumeToken(final AggregateResponseBatchCursor commandBatchCursor) {
+ if (commandBatchCursor.getPostBatchResumeToken() != null) {
+ resumeToken = commandBatchCursor.getPostBatchResumeToken();
}
}
@@ -170,7 +170,7 @@ static List convertAndProduceLastId(@Nullable final List
final Decoder decoder,
final Consumer lastIdConsumer) {
List results = null;
- if (rawDocuments != null) {
+ if (rawDocuments != null && !rawDocuments.isEmpty()) {
results = new ArrayList<>();
for (RawBsonDocument rawDocument : rawDocuments) {
if (!rawDocument.containsKey("_id")) {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java
new file mode 100644
index 00000000000..d48cc9906c7
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java
@@ -0,0 +1,383 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.MongoSocketException;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+import com.mongodb.ServerCursor;
+import com.mongodb.annotations.ThreadSafe;
+import com.mongodb.connection.ServerType;
+import com.mongodb.internal.VisibleForTesting;
+import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.binding.ConnectionSource;
+import com.mongodb.internal.connection.Connection;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
+import org.bson.codecs.BsonDocumentCodec;
+import org.bson.codecs.Decoder;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.assertions.Assertions.notNull;
+import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_ITERATOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.NO_OP_FIELD_NAME_VALIDATOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
+import static com.mongodb.internal.operation.OperationHelper.LOGGER;
+import static java.lang.String.format;
+
+class CommandBatchCursor implements AggregateResponseBatchCursor {
+
+ private final MongoNamespace namespace;
+ private final int limit;
+ private final Decoder decoder;
+ private final long maxTimeMS;
+ @Nullable
+ private final BsonValue comment;
+
+ private CommandCursorResult commandCursor;
+ private int batchSize;
+ @Nullable
+ private List nextBatch;
+ private int count = 0;
+
+ private final boolean firstBatchEmpty;
+ private final int maxWireVersion;
+ private final ResourceManager resourceManager;
+
+ CommandBatchCursor(
+ final ServerAddress serverAddress,
+ final BsonDocument commandCursorDocument,
+ final int limit, final int batchSize, final long maxTimeMS,
+ final Decoder decoder,
+ @Nullable final BsonValue comment,
+ final ConnectionSource connectionSource,
+ final Connection connection) {
+ this.commandCursor = initFromCommandCursorDocument(serverAddress, FIRST_BATCH, commandCursorDocument);
+ this.namespace = commandCursor.getNamespace();
+ this.limit = limit;
+ this.batchSize = batchSize;
+ this.maxTimeMS = maxTimeMS;
+ this.decoder = notNull("decoder", decoder);
+ this.comment = comment;
+ this.maxWireVersion = connection.getDescription().getMaxWireVersion();
+ this.firstBatchEmpty = commandCursor.getResults().isEmpty();
+
+ Connection connectionToPin = null;
+ boolean releaseServerAndResources = false;
+ if (limitReached()) {
+ releaseServerAndResources = true;
+ } else if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) {
+ connectionToPin = connection;
+ }
+
+ resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursor.getServerCursor());
+ if (releaseServerAndResources) {
+ resourceManager.releaseServerAndClientResources(connection);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return assertNotNull(resourceManager.execute(MESSAGE_IF_CLOSED_AS_CURSOR, this::doHasNext));
+ }
+
+ private boolean doHasNext() {
+ if (nextBatch != null) {
+ return true;
+ }
+
+ if (limitReached()) {
+ return false;
+ }
+
+ while (resourceManager.getServerCursor() != null) {
+ getMore();
+ if (!resourceManager.operable()) {
+ throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR);
+ }
+ if (nextBatch != null) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public List next() {
+ return assertNotNull(resourceManager.execute(MESSAGE_IF_CLOSED_AS_ITERATOR, this::doNext));
+ }
+
+ @Override
+ public int available() {
+ return !resourceManager.operable() || nextBatch == null ? 0 : nextBatch.size();
+ }
+
+ @Nullable
+ private List doNext() {
+ if (!doHasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ List retVal = nextBatch;
+ nextBatch = null;
+ return retVal;
+ }
+
+
+ @VisibleForTesting(otherwise = PRIVATE)
+ boolean isClosed() {
+ return !resourceManager.operable();
+ }
+
+ @Override
+ public void setBatchSize(final int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Not implemented yet!");
+ }
+
+ @Override
+ public void close() {
+ resourceManager.close();
+ }
+
+ @Nullable
+ @Override
+ public List tryNext() {
+ return resourceManager.execute(MESSAGE_IF_CLOSED_AS_CURSOR, () -> {
+ if (!tryHasNext()) {
+ return null;
+ }
+ return doNext();
+ });
+ }
+
+ private boolean tryHasNext() {
+ if (nextBatch != null) {
+ return true;
+ }
+
+ if (limitReached()) {
+ return false;
+ }
+
+ if (resourceManager.getServerCursor() != null) {
+ getMore();
+ }
+
+ return nextBatch != null;
+ }
+
+ @Override
+ @Nullable
+ public ServerCursor getServerCursor() {
+ if (!resourceManager.operable()) {
+ throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
+ }
+ return resourceManager.getServerCursor();
+ }
+
+ @Override
+ public ServerAddress getServerAddress() {
+ if (!resourceManager.operable()) {
+ throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
+ }
+
+ return commandCursor.getServerAddress();
+ }
+
+ @Override
+ public BsonDocument getPostBatchResumeToken() {
+ return commandCursor.getPostBatchResumeToken();
+ }
+
+ @Override
+ public BsonTimestamp getOperationTime() {
+ return commandCursor.getOperationTime();
+ }
+
+ @Override
+ public boolean isFirstBatchEmpty() {
+ return firstBatchEmpty;
+ }
+
+ @Override
+ public int getMaxWireVersion() {
+ return maxWireVersion;
+ }
+
+ private void getMore() {
+ ServerCursor serverCursor = assertNotNull(resourceManager.getServerCursor());
+ resourceManager.executeWithConnection(connection -> {
+ ServerCursor nextServerCursor;
+ try {
+ commandCursor = initFromCommandCursorDocument(connection.getDescription().getServerAddress(), NEXT_BATCH,
+ assertNotNull(
+ connection.command(namespace.getDatabaseName(),
+ getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace,
+ limit, batchSize, count, maxTimeMS, comment),
+ NO_OP_FIELD_NAME_VALIDATOR,
+ ReadPreference.primary(),
+ CommandResultDocumentCodec.create(decoder, NEXT_BATCH),
+ assertNotNull(resourceManager.getConnectionSource()))));
+ nextServerCursor = commandCursor.getServerCursor();
+ } catch (MongoCommandException e) {
+ throw translateCommandException(e, serverCursor);
+ }
+
+ resourceManager.setServerCursor(nextServerCursor);
+ if (!resourceManager.operable() || limitReached() || nextServerCursor == null) {
+ resourceManager.releaseServerAndClientResources(connection);
+ }
+ });
+ }
+
+ private CommandCursorResult initFromCommandCursorDocument(final ServerAddress serverAddress, final String fieldNameContainingBatch,
+ final BsonDocument commandCursorDocument) {
+ CommandCursorResult commandCursorResult = new CommandCursorResult<>(serverAddress, fieldNameContainingBatch,
+ commandCursorDocument);
+ this.nextBatch = commandCursorResult.getResults().isEmpty() ? null : commandCursorResult.getResults();
+ this.count += commandCursorResult.getResults().size();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", commandCursorResult.getResults().size(),
+ commandCursorResult.getCursorId(), commandCursorResult.getServerAddress()));
+ }
+ return commandCursorResult;
+ }
+
+ private boolean limitReached() {
+ return Math.abs(limit) != 0 && count >= Math.abs(limit);
+ }
+
+ @ThreadSafe
+ private static final class ResourceManager extends CursorResourceManager {
+
+ ResourceManager(
+ final MongoNamespace namespace,
+ final ConnectionSource connectionSource,
+ @Nullable final Connection connectionToPin,
+ @Nullable final ServerCursor serverCursor) {
+ super(namespace, connectionSource, connectionToPin, serverCursor);
+ }
+
+ /**
+ * Thread-safe.
+ * Executes {@code operation} within the {@link #tryStartOperation()}/{@link #endOperation()} bounds.
+ *
+ * @throws IllegalStateException If {@linkplain CommandBatchCursor#close() closed}.
+ */
+ @Nullable
+ R execute(final String exceptionMessageIfClosed, final Supplier operation) throws IllegalStateException {
+ if (!tryStartOperation()) {
+ throw new IllegalStateException(exceptionMessageIfClosed);
+ }
+ try {
+ return operation.get();
+ } finally {
+ endOperation();
+ }
+ }
+
+ private Connection connection() {
+ assertTrue(getState() != State.IDLE);
+ Connection pinnedConnection = getPinnedConnection();
+ if (pinnedConnection != null) {
+ return assertNotNull(pinnedConnection).retain();
+ } else {
+ return assertNotNull(getConnectionSource()).getConnection();
+ }
+ }
+
+ @Override
+ void markAsPinned(final Connection connectionToPin, final Connection.PinningMode pinningMode) {
+ connectionToPin.markAsPinned(pinningMode);
+ }
+
+ @Override
+ void executeWithConnection(final Consumer action) {
+ Connection connection = connection();
+ try {
+ action.accept(connection);
+ } catch (MongoSocketException e) {
+ try {
+ onCorruptedConnection(connection);
+ } catch (Exception suppressed) {
+ e.addSuppressed(suppressed);
+ }
+ throw e;
+ } finally {
+ connection.release();
+ }
+ }
+
+ @Override
+ void executeWithConnection(final Consumer action, final SingleResultCallback callback) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ void doClose() {
+ if (isSkipReleasingServerResourcesOnClose()) {
+ unsetServerCursor();
+ }
+
+ try {
+ if (getServerCursor() != null) {
+ executeWithConnection(this::releaseServerResources);
+ }
+ } catch (MongoException e) {
+ // ignore exceptions when releasing server resources
+ } finally {
+ // guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
+ unsetServerCursor();
+ releaseClientResources();
+ }
+ }
+
+ @Override
+ void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor, final Connection localConnection) {
+ localConnection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
+ NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(), assertNotNull(getConnectionSource()));
+ releaseClientResources();
+ }
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursorHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursorHelper.java
new file mode 100644
index 00000000000..d7e125b5eb8
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursorHelper.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.MongoQueryException;
+import com.mongodb.ServerCursor;
+import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.validator.NoOpFieldNameValidator;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.FieldNameValidator;
+
+import static com.mongodb.internal.operation.CursorHelper.getNumberToReturn;
+import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
+import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour;
+
+final class CommandBatchCursorHelper {
+
+ static final String FIRST_BATCH = "firstBatch";
+ static final String NEXT_BATCH = "nextBatch";
+ static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
+ static final String MESSAGE_IF_CLOSED_AS_CURSOR = "Cursor has been closed";
+ static final String MESSAGE_IF_CLOSED_AS_ITERATOR = "Iterator has been closed";
+
+ static final String MESSAGE_IF_CONCURRENT_OPERATION = "Another operation is currently in progress, concurrent operations are not "
+ + "supported";
+
+ static BsonDocument getMoreCommandDocument(
+ final long cursorId, final ConnectionDescription connectionDescription, final MongoNamespace namespace, final int limit,
+ final int batchSize, final int count, final long maxTimeMS, @Nullable final BsonValue comment) {
+ BsonDocument document = new BsonDocument("getMore", new BsonInt64(cursorId))
+ .append("collection", new BsonString(namespace.getCollectionName()));
+
+ int batchSizeForGetMoreCommand = Math.abs(getNumberToReturn(limit, batchSize, count));
+ if (batchSizeForGetMoreCommand != 0) {
+ document.append("batchSize", new BsonInt32(batchSizeForGetMoreCommand));
+ }
+ if (maxTimeMS != 0) {
+ document.append("maxTimeMS", new BsonInt64(maxTimeMS));
+ }
+ if (serverIsAtLeastVersionFourDotFour(connectionDescription)) {
+ putIfNotNull(document, "comment", comment);
+ }
+ return document;
+ }
+
+ static MongoQueryException translateCommandException(final MongoCommandException commandException, final ServerCursor cursor) {
+ if (commandException.getErrorCode() == 43) {
+ return new MongoCursorNotFoundException(cursor.getId(), commandException.getResponse(), cursor.getAddress());
+ } else {
+ return new MongoQueryException(commandException.getResponse(), commandException.getServerAddress());
+ }
+ }
+
+ private CommandBatchCursorHelper() {
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/QueryResult.java b/driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java
similarity index 52%
rename from driver-core/src/main/com/mongodb/internal/connection/QueryResult.java
rename to driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java
index 52970ba7b94..8fa6c1bdde3 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/QueryResult.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java
@@ -14,40 +14,52 @@
* limitations under the License.
*/
-package com.mongodb.internal.connection;
+package com.mongodb.internal.operation;
import com.mongodb.MongoNamespace;
import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
import java.util.List;
+import static com.mongodb.assertions.Assertions.isTrue;
+
/**
- * A batch of query results.
+ * The command cursor result
*
*
This class is not part of the public API and may be removed or changed at any time
*/
-public class QueryResult {
- private final MongoNamespace namespace;
+public class CommandCursorResult {
+
+ private static final String CURSOR = "cursor";
+ private static final String POST_BATCH_RESUME_TOKEN = "postBatchResumeToken";
+ private static final String OPERATION_TIME = "operationTime";
+
+ private final BsonDocument cursorDocument;
+ private final ServerAddress serverAddress;
private final List results;
+ private final MongoNamespace namespace;
private final long cursorId;
- private final ServerAddress serverAddress;
+ @Nullable
+ private final BsonTimestamp operationTime;
+ @Nullable
+ private final BsonDocument postBatchResumeToken;
- /**
- * Construct an instance.
- *
- * @param namespace the namespace
- * @param results the query results
- * @param cursorId the cursor id
- * @param serverAddress the server address
- */
- public QueryResult(@Nullable final MongoNamespace namespace, final List results, final long cursorId,
- final ServerAddress serverAddress) {
- this.namespace = namespace;
- this.results = results;
- this.cursorId = cursorId;
+ public CommandCursorResult(
+ final ServerAddress serverAddress,
+ final String fieldNameContainingBatch,
+ final BsonDocument commandCursorDocument) {
+ isTrue("Contains cursor", commandCursorDocument.isDocument(CURSOR));
this.serverAddress = serverAddress;
+ this.cursorDocument = commandCursorDocument.getDocument(CURSOR);
+ this.results = BsonDocumentWrapperHelper.toList(cursorDocument, fieldNameContainingBatch);
+ this.namespace = new MongoNamespace(cursorDocument.getString("ns").getValue());
+ this.cursorId = cursorDocument.getNumber("id").longValue();
+ this.operationTime = cursorDocument.getTimestamp(OPERATION_TIME, null);
+ this.postBatchResumeToken = cursorDocument.getDocument(POST_BATCH_RESUME_TOKEN, null);
}
/**
@@ -55,7 +67,6 @@ public QueryResult(@Nullable final MongoNamespace namespace, final List resul
*
* @return the namespace
*/
- @Nullable
public MongoNamespace getNamespace() {
return namespace;
}
@@ -66,7 +77,7 @@ public MongoNamespace getNamespace() {
* @return the cursor, which may be null if it's been exhausted
*/
@Nullable
- public ServerCursor getCursor() {
+ public ServerCursor getServerCursor() {
return cursorId == 0 ? null : new ServerCursor(cursorId, serverAddress);
}
@@ -84,11 +95,21 @@ public List getResults() {
*
* @return the server address
*/
- public ServerAddress getAddress() {
+ public ServerAddress getServerAddress() {
return serverAddress;
}
public long getCursorId() {
return cursorId;
}
+
+ @Nullable
+ public BsonDocument getPostBatchResumeToken() {
+ return postBatchResumeToken;
+ }
+
+ @Nullable
+ public BsonTimestamp getOperationTime() {
+ return operationTime;
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java
new file mode 100644
index 00000000000..dcf0a232658
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.MongoNamespace;
+import com.mongodb.ServerCursor;
+import com.mongodb.annotations.ThreadSafe;
+import com.mongodb.internal.async.SingleResultCallback;
+import com.mongodb.internal.binding.ReferenceCounted;
+import com.mongodb.internal.connection.Connection;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.assertions.Assertions.fail;
+import static com.mongodb.assertions.Assertions.notNull;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CONCURRENT_OPERATION;
+import static java.util.Collections.singletonList;
+
+/**
+ * This is the resource manager for {@link CommandBatchCursor} or {@link AsyncCommandBatchCursor} implementations.
+ *
+ * This class maintains all resources that must be released in {@link CommandBatchCursor#close()} /
+ * {@link AsyncCommandBatchCursor#close()}. The abstract {@linkplain #doClose() deferred close action} is such that it is totally
+ * ordered with other operations of {@link CommandBatchCursor} / {@link AsyncCommandBatchCursor} (methods {@link #tryStartOperation()}/
+ * {@link #endOperation()} must be used properly to enforce the order) despite the method {@link CommandBatchCursor#close()} /
+ * {@link AsyncCommandBatchCursor#close()} being called concurrently with those operations.
+ *
+ * This total order induces the happens-before order.
+ *
+ * The deferred close action does not violate externally observable idempotence of {@link CommandBatchCursor#close()} /
+ * {@link AsyncCommandBatchCursor#close()}, because the close method is allowed to release resources "eventually".
+ *
diff --git a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java
deleted file mode 100644
index 139c3e6fd27..00000000000
--- a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java
+++ /dev/null
@@ -1,632 +0,0 @@
-/*
- * Copyright 2008-present MongoDB, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.mongodb.internal.operation;
-
-import com.mongodb.MongoCommandException;
-import com.mongodb.MongoException;
-import com.mongodb.MongoNamespace;
-import com.mongodb.MongoSocketException;
-import com.mongodb.ReadPreference;
-import com.mongodb.ServerAddress;
-import com.mongodb.ServerCursor;
-import com.mongodb.annotations.ThreadSafe;
-import com.mongodb.connection.ConnectionDescription;
-import com.mongodb.connection.ServerType;
-import com.mongodb.internal.binding.ConnectionSource;
-import com.mongodb.internal.connection.Connection;
-import com.mongodb.internal.connection.QueryResult;
-import com.mongodb.internal.diagnostics.logging.Logger;
-import com.mongodb.internal.diagnostics.logging.Loggers;
-import com.mongodb.internal.validator.NoOpFieldNameValidator;
-import com.mongodb.lang.Nullable;
-import org.bson.BsonArray;
-import org.bson.BsonDocument;
-import org.bson.BsonInt32;
-import org.bson.BsonInt64;
-import org.bson.BsonString;
-import org.bson.BsonTimestamp;
-import org.bson.BsonValue;
-import org.bson.FieldNameValidator;
-import org.bson.codecs.BsonDocumentCodec;
-import org.bson.codecs.Decoder;
-
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.StampedLock;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import static com.mongodb.assertions.Assertions.assertNotNull;
-import static com.mongodb.assertions.Assertions.assertNull;
-import static com.mongodb.assertions.Assertions.assertTrue;
-import static com.mongodb.assertions.Assertions.fail;
-import static com.mongodb.assertions.Assertions.isTrueArgument;
-import static com.mongodb.assertions.Assertions.notNull;
-import static com.mongodb.internal.operation.CursorHelper.getNumberToReturn;
-import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
-import static com.mongodb.internal.operation.SyncOperationHelper.getMoreCursorDocumentToQueryResult;
-import static com.mongodb.internal.operation.QueryHelper.translateCommandException;
-import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour;
-import static java.lang.String.format;
-import static java.util.Collections.singletonList;
-
-class QueryBatchCursor implements AggregateResponseBatchCursor {
- private static final Logger LOGGER = Loggers.getLogger("operation");
- private static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
- private static final String CURSOR = "cursor";
- private static final String POST_BATCH_RESUME_TOKEN = "postBatchResumeToken";
- private static final String OPERATION_TIME = "operationTime";
- private static final String MESSAGE_IF_CLOSED_AS_CURSOR = "Cursor has been closed";
- private static final String MESSAGE_IF_CLOSED_AS_ITERATOR = "Iterator has been closed";
-
- private final MongoNamespace namespace;
- private final ServerAddress serverAddress;
- private final int limit;
- private final Decoder decoder;
- private final long maxTimeMS;
- private int batchSize;
- private final BsonValue comment;
- private List nextBatch;
- private int count;
- private BsonDocument postBatchResumeToken;
- private BsonTimestamp operationTime;
- private final boolean firstBatchEmpty;
- private int maxWireVersion = 0;
- private final ResourceManager resourceManager;
-
- QueryBatchCursor(final QueryResult firstQueryResult, final int limit, final int batchSize, final Decoder decoder) {
- this(firstQueryResult, limit, batchSize, decoder, null, null);
- }
-
- QueryBatchCursor(final QueryResult firstQueryResult, final int limit, final int batchSize, final Decoder decoder,
- @Nullable final BsonValue comment, @Nullable final ConnectionSource connectionSource) {
- this(firstQueryResult, limit, batchSize, 0, decoder, comment, connectionSource, null, null);
- }
-
- QueryBatchCursor(final QueryResult firstQueryResult, final int limit, final int batchSize, final long maxTimeMS,
- final Decoder decoder, @Nullable final BsonValue comment, @Nullable final ConnectionSource connectionSource,
- @Nullable final Connection connection) {
- this(firstQueryResult, limit, batchSize, maxTimeMS, decoder, comment, connectionSource, connection, null);
- }
-
- QueryBatchCursor(final QueryResult firstQueryResult, final int limit, final int batchSize, final long maxTimeMS,
- final Decoder decoder, @Nullable final BsonValue comment, @Nullable final ConnectionSource connectionSource,
- @Nullable final Connection connection, @Nullable final BsonDocument result) {
- isTrueArgument("maxTimeMS >= 0", maxTimeMS >= 0);
- this.maxTimeMS = maxTimeMS;
- this.namespace = firstQueryResult.getNamespace();
- this.serverAddress = firstQueryResult.getAddress();
- this.limit = limit;
- this.comment = comment;
- this.batchSize = batchSize;
- this.decoder = notNull("decoder", decoder);
- if (result != null) {
- this.operationTime = result.getTimestamp(OPERATION_TIME, null);
- this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
- }
- ServerCursor serverCursor = initFromQueryResult(firstQueryResult);
- if (serverCursor != null) {
- notNull("connectionSource", connectionSource);
- }
- firstBatchEmpty = firstQueryResult.getResults().isEmpty();
- Connection connectionToPin = null;
- boolean releaseServerAndResources = false;
- if (connection != null) {
- this.maxWireVersion = connection.getDescription().getMaxWireVersion();
- if (limitReached()) {
- releaseServerAndResources = true;
- } else {
- assertNotNull(connectionSource);
- if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) {
- connectionToPin = connection;
- }
- }
- }
- resourceManager = new ResourceManager(connectionSource, connectionToPin, serverCursor);
- if (releaseServerAndResources) {
- resourceManager.releaseServerAndClientResources(assertNotNull(connection));
- }
- }
-
- @Override
- public boolean hasNext() {
- return assertNotNull(resourceManager.execute(MESSAGE_IF_CLOSED_AS_CURSOR, this::doHasNext));
- }
-
- private boolean doHasNext() {
- if (nextBatch != null) {
- return true;
- }
-
- if (limitReached()) {
- return false;
- }
-
- while (resourceManager.serverCursor() != null) {
- getMore();
- if (!resourceManager.operable()) {
- throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR);
- }
- if (nextBatch != null) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- public List next() {
- return assertNotNull(resourceManager.execute(MESSAGE_IF_CLOSED_AS_ITERATOR, this::doNext));
- }
-
- @Override
- public int available() {
- return !resourceManager.operable() || nextBatch == null ? 0 : nextBatch.size();
- }
-
- private List doNext() {
- if (!doHasNext()) {
- throw new NoSuchElementException();
- }
-
- List retVal = nextBatch;
- nextBatch = null;
- return retVal;
- }
-
- @Override
- public void setBatchSize(final int batchSize) {
- this.batchSize = batchSize;
- }
-
- @Override
- public int getBatchSize() {
- return batchSize;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Not implemented yet!");
- }
-
- @Override
- public void close() {
- resourceManager.close();
- }
-
- @Nullable
- @Override
- public List tryNext() {
- return resourceManager.execute(MESSAGE_IF_CLOSED_AS_CURSOR, () -> {
- if (!tryHasNext()) {
- return null;
- }
- return doNext();
- });
- }
-
- private boolean tryHasNext() {
- if (nextBatch != null) {
- return true;
- }
-
- if (limitReached()) {
- return false;
- }
-
- if (resourceManager.serverCursor() != null) {
- getMore();
- }
-
- return nextBatch != null;
- }
-
- @Override
- @Nullable
- public ServerCursor getServerCursor() {
- if (!resourceManager.operable()) {
- throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
- }
-
- return resourceManager.serverCursor();
- }
-
- @Override
- public ServerAddress getServerAddress() {
- if (!resourceManager.operable()) {
- throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
- }
-
- return serverAddress;
- }
-
- @Override
- public BsonDocument getPostBatchResumeToken() {
- return postBatchResumeToken;
- }
-
- @Override
- public BsonTimestamp getOperationTime() {
- return operationTime;
- }
-
- @Override
- public boolean isFirstBatchEmpty() {
- return firstBatchEmpty;
- }
-
- @Override
- public int getMaxWireVersion() {
- return maxWireVersion;
- }
-
- private void getMore() {
- ServerCursor serverCursor = assertNotNull(resourceManager.serverCursor());
- resourceManager.executeWithConnection(connection -> {
- ServerCursor nextServerCursor;
- try {
- nextServerCursor = initFromCommandResult(connection.command(namespace.getDatabaseName(),
- asGetMoreCommandDocument(serverCursor.getId(), connection.getDescription()),
- NO_OP_FIELD_NAME_VALIDATOR,
- ReadPreference.primary(),
- CommandResultDocumentCodec.create(decoder, "nextBatch"),
- assertNotNull(resourceManager.connectionSource)));
- } catch (MongoCommandException e) {
- throw translateCommandException(e, serverCursor);
- }
- resourceManager.setServerCursor(nextServerCursor);
- if (limitReached()) {
- resourceManager.releaseServerAndClientResources(connection);
- }
- });
- }
-
- private BsonDocument asGetMoreCommandDocument(final long cursorId, final ConnectionDescription connectionDescription) {
- BsonDocument document = new BsonDocument("getMore", new BsonInt64(cursorId))
- .append("collection", new BsonString(namespace.getCollectionName()));
-
- int batchSizeForGetMoreCommand = Math.abs(getNumberToReturn(limit, this.batchSize, count));
- if (batchSizeForGetMoreCommand != 0) {
- document.append("batchSize", new BsonInt32(batchSizeForGetMoreCommand));
- }
- if (maxTimeMS != 0) {
- document.append("maxTimeMS", new BsonInt64(maxTimeMS));
- }
- if (serverIsAtLeastVersionFourDotFour(connectionDescription)) {
- putIfNotNull(document, "comment", comment);
- }
- return document;
- }
-
- @Nullable
- private ServerCursor initFromQueryResult(final QueryResult queryResult) {
- nextBatch = queryResult.getResults().isEmpty() ? null : queryResult.getResults();
- count += queryResult.getResults().size();
- LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", queryResult.getResults().size(),
- queryResult.getCursorId(), queryResult.getAddress()));
- return queryResult.getCursor();
- }
-
- @Nullable
- private ServerCursor initFromCommandResult(final BsonDocument getMoreCommandResultDocument) {
- QueryResult queryResult = getMoreCursorDocumentToQueryResult(getMoreCommandResultDocument.getDocument(CURSOR), serverAddress);
- postBatchResumeToken = getPostBatchResumeTokenFromResponse(getMoreCommandResultDocument);
- operationTime = getMoreCommandResultDocument.getTimestamp(OPERATION_TIME, null);
- return initFromQueryResult(queryResult);
- }
-
- private boolean limitReached() {
- return Math.abs(limit) != 0 && count >= Math.abs(limit);
- }
-
- @Nullable
- private BsonDocument getPostBatchResumeTokenFromResponse(final BsonDocument result) {
- BsonDocument cursor = result.getDocument(CURSOR, null);
- if (cursor != null) {
- return cursor.getDocument(POST_BATCH_RESUME_TOKEN, null);
- }
- return null;
- }
-
- /**
- * This class maintains all resources that must be released in {@link QueryBatchCursor#close()}.
- * It also implements a {@linkplain #doClose() deferred close action} such that it is totally ordered with other operations of
- * {@link QueryBatchCursor} (methods {@link #tryStartOperation()}/{@link #endOperation()} must be used properly to enforce the order)
- * despite the method {@link QueryBatchCursor#close()} being called concurrently with those operations.
- * This total order induces the happens-before order.
- *
- * The deferred close action does not violate externally observable idempotence of {@link QueryBatchCursor#close()},
- * because {@link QueryBatchCursor#close()} is allowed to release resources "eventually".
- *
- * Only methods explicitly documented as thread-safe are thread-safe,
- * others are not and rely on the total order mentioned above.
- */
- @ThreadSafe
- private final class ResourceManager {
- private final Lock lock;
- private volatile State state;
- @Nullable
- private volatile ConnectionSource connectionSource;
- @Nullable
- private volatile Connection pinnedConnection;
- @Nullable
- private volatile ServerCursor serverCursor;
- private volatile boolean skipReleasingServerResourcesOnClose;
-
- ResourceManager(@Nullable final ConnectionSource connectionSource,
- @Nullable final Connection connectionToPin, @Nullable final ServerCursor serverCursor) {
- lock = new StampedLock().asWriteLock();
- state = State.IDLE;
- if (serverCursor != null) {
- this.connectionSource = (assertNotNull(connectionSource)).retain();
- if (connectionToPin != null) {
- this.pinnedConnection = connectionToPin.retain();
- connectionToPin.markAsPinned(Connection.PinningMode.CURSOR);
- }
- }
- skipReleasingServerResourcesOnClose = false;
- this.serverCursor = serverCursor;
- }
-
- /**
- * Thread-safe.
- */
- boolean operable() {
- return state.operable();
- }
-
- /**
- * Thread-safe.
- * Executes {@code operation} within the {@link #tryStartOperation()}/{@link #endOperation()} bounds.
- *
- * @throws IllegalStateException If {@linkplain QueryBatchCursor#close() closed}.
- */
- @Nullable
- R execute(final String exceptionMessageIfClosed, final Supplier operation) throws IllegalStateException {
- if (!tryStartOperation()) {
- throw new IllegalStateException(exceptionMessageIfClosed);
- }
- try {
- return operation.get();
- } finally {
- endOperation();
- }
- }
-
- /**
- * Thread-safe.
- * Returns {@code true} iff started an operation.
- * If {@linkplain #operable() closed}, then returns false, otherwise completes abruptly.
- * @throws IllegalStateException Iff another operation is in progress.
- */
- private boolean tryStartOperation() throws IllegalStateException {
- lock.lock();
- try {
- State localState = state;
- if (!localState.operable()) {
- return false;
- } else if (localState == State.IDLE) {
- state = State.OPERATION_IN_PROGRESS;
- return true;
- } else if (localState == State.OPERATION_IN_PROGRESS) {
- throw new IllegalStateException("Another operation is currently in progress, concurrent operations are not supported");
- } else {
- throw fail(state.toString());
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Thread-safe.
- */
- private void endOperation() {
- boolean doClose = false;
- lock.lock();
- try {
- State localState = state;
- if (localState == State.OPERATION_IN_PROGRESS) {
- state = State.IDLE;
- } else if (localState == State.CLOSE_PENDING) {
- state = State.CLOSED;
- doClose = true;
- } else {
- fail(localState.toString());
- }
- } finally {
- lock.unlock();
- }
- if (doClose) {
- doClose();
- }
- }
-
- /**
- * Thread-safe.
- */
- void close() {
- boolean doClose = false;
- lock.lock();
- try {
- State localState = state;
- if (localState == State.OPERATION_IN_PROGRESS) {
- state = State.CLOSE_PENDING;
- } else if (localState != State.CLOSED) {
- state = State.CLOSED;
- doClose = true;
- }
- } finally {
- lock.unlock();
- }
- if (doClose) {
- doClose();
- }
- }
-
- /**
- * This method is never executed concurrently with either itself or other operations
- * demarcated by {@link #tryStartOperation()}/{@link #endOperation()}.
- */
- private void doClose() {
- try {
- if (skipReleasingServerResourcesOnClose) {
- serverCursor = null;
- } else if (serverCursor != null) {
- Connection connection = connection();
- try {
- releaseServerResources(connection);
- } finally {
- connection.release();
- }
- }
- } catch (MongoException e) {
- // ignore exceptions when releasing server resources
- } finally {
- // guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
- serverCursor = null;
- releaseClientResources();
- }
- }
-
- void onCorruptedConnection(final Connection corruptedConnection) {
- assertTrue(state.inProgress());
- // if `pinnedConnection` is corrupted, then we cannot kill `serverCursor` via such a connection
- Connection localPinnedConnection = pinnedConnection;
- if (localPinnedConnection != null) {
- assertTrue(corruptedConnection == localPinnedConnection);
- skipReleasingServerResourcesOnClose = true;
- }
- }
-
- void executeWithConnection(final Consumer action) {
- Connection connection = connection();
- try {
- action.accept(connection);
- } catch (MongoSocketException e) {
- try {
- onCorruptedConnection(connection);
- } catch (Exception suppressed) {
- e.addSuppressed(suppressed);
- }
- throw e;
- } finally {
- connection.release();
- }
- }
-
- private Connection connection() {
- assertTrue(state != State.IDLE);
- if (pinnedConnection == null) {
- return assertNotNull(connectionSource).getConnection();
- } else {
- return assertNotNull(pinnedConnection).retain();
- }
- }
-
- /**
- * Thread-safe.
- */
- @Nullable
- ServerCursor serverCursor() {
- return serverCursor;
- }
-
- void setServerCursor(@Nullable final ServerCursor serverCursor) {
- assertTrue(state.inProgress());
- assertNotNull(this.serverCursor);
- // without `connectionSource` we will not be able to kill `serverCursor` later
- assertNotNull(connectionSource);
- this.serverCursor = serverCursor;
- if (serverCursor == null) {
- releaseClientResources();
- }
- }
-
-
- void releaseServerAndClientResources(final Connection connection) {
- try {
- releaseServerResources(assertNotNull(connection));
- } finally {
- releaseClientResources();
- }
- }
-
- private void releaseServerResources(final Connection connection) {
- try {
- ServerCursor localServerCursor = serverCursor;
- if (localServerCursor != null) {
- killServerCursor(namespace, localServerCursor, assertNotNull(connection));
- }
- } finally {
- serverCursor = null;
- }
- }
-
- private void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final Connection connection) {
- connection.command(namespace.getDatabaseName(), asKillCursorsCommandDocument(namespace, serverCursor),
- NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(), assertNotNull(connectionSource));
- }
-
- private BsonDocument asKillCursorsCommandDocument(final MongoNamespace namespace, final ServerCursor serverCursor) {
- return new BsonDocument("killCursors", new BsonString(namespace.getCollectionName()))
- .append("cursors", new BsonArray(singletonList(new BsonInt64(serverCursor.getId()))));
- }
-
- private void releaseClientResources() {
- assertNull(serverCursor);
- ConnectionSource localConnectionSource = connectionSource;
- if (localConnectionSource != null) {
- localConnectionSource.release();
- connectionSource = null;
- }
- Connection localPinnedConnection = pinnedConnection;
- if (localPinnedConnection != null) {
- localPinnedConnection.release();
- pinnedConnection = null;
- }
- }
- }
-
- private enum State {
- IDLE(true, false),
- OPERATION_IN_PROGRESS(true, true),
- /**
- * Implies {@link #OPERATION_IN_PROGRESS}.
- */
- CLOSE_PENDING(false, true),
- CLOSED(false, false);
-
- private final boolean operable;
- private final boolean inProgress;
-
- State(final boolean operable, final boolean inProgress) {
- this.operable = operable;
- this.inProgress = inProgress;
- }
-
- boolean operable() {
- return operable;
- }
-
- boolean inProgress() {
- return inProgress;
- }
- }
-}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/QueryHelper.java b/driver-core/src/main/com/mongodb/internal/operation/QueryHelper.java
deleted file mode 100644
index 053e4fc8817..00000000000
--- a/driver-core/src/main/com/mongodb/internal/operation/QueryHelper.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2008-present MongoDB, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.mongodb.internal.operation;
-
-import com.mongodb.MongoCommandException;
-import com.mongodb.MongoCursorNotFoundException;
-import com.mongodb.MongoQueryException;
-import com.mongodb.ServerCursor;
-
-final class QueryHelper {
- static MongoQueryException translateCommandException(final MongoCommandException commandException, final ServerCursor cursor) {
- if (commandException.getErrorCode() == 43) {
- return new MongoCursorNotFoundException(cursor.getId(), commandException.getResponse(), cursor.getAddress());
- } else {
- return new MongoQueryException(commandException.getResponse(), commandException.getServerAddress());
- }
- }
-
- private QueryHelper() {
- }
-}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/SingleBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/SingleBatchCursor.java
new file mode 100644
index 00000000000..8a673ee93d9
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/SingleBatchCursor.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.internal.operation;
+
+import com.mongodb.ServerAddress;
+import com.mongodb.ServerCursor;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static java.util.Collections.emptyList;
+
+class SingleBatchCursor implements BatchCursor {
+
+ static SingleBatchCursor createEmptySingleBatchCursor(final ServerAddress serverAddress, final int batchSize) {
+ return new SingleBatchCursor<>(emptyList(), batchSize, serverAddress);
+ }
+
+ private final List batch;
+ private final ServerAddress serverAddress;
+ private final int batchSize;
+ private boolean hasNext;
+
+ SingleBatchCursor(final List batch, final int batchSize, final ServerAddress serverAddress) {
+ this.batch = batch;
+ this.serverAddress = serverAddress;
+ this.batchSize = batchSize;
+ this.hasNext = !batch.isEmpty();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ @Override
+ public List next() {
+ if (hasNext) {
+ hasNext = false;
+ return batch;
+ }
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public int available() {
+ return hasNext ? 1 : 0;
+ }
+
+ @Override
+ public void setBatchSize(final int batchSize) {
+ // NOOP
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public List tryNext() {
+ return hasNext ? next() : null;
+ }
+
+ @Override
+ public ServerCursor getServerCursor() {
+ return null;
+ }
+
+ @Override
+ public ServerAddress getServerAddress() {
+ return serverAddress;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java
index 67d5acf9c37..476a2676b8e 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java
@@ -18,7 +18,6 @@
import com.mongodb.MongoException;
import com.mongodb.ReadPreference;
-import com.mongodb.ServerAddress;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.async.function.AsyncCallbackBiFunction;
@@ -32,7 +31,6 @@
import com.mongodb.internal.binding.WriteBinding;
import com.mongodb.internal.connection.Connection;
import com.mongodb.internal.connection.OperationContext;
-import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.operation.retry.AttachmentKeys;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
@@ -56,7 +54,6 @@
import static com.mongodb.internal.operation.OperationHelper.ResourceSupplierInternalException;
import static com.mongodb.internal.operation.OperationHelper.canRetryRead;
import static com.mongodb.internal.operation.OperationHelper.canRetryWrite;
-import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError;
final class SyncOperationHelper {
@@ -303,14 +300,16 @@ static CommandWriteTransformer writeConcernErrorTransformer(
};
}
- static BatchCursor cursorDocumentToBatchCursor(final BsonDocument cursorDocument, final Decoder decoder,
- final BsonValue comment, final ConnectionSource source, final Connection connection, final int batchSize) {
- return new QueryBatchCursor<>(cursorDocumentToQueryResult(cursorDocument, source.getServerDescription().getAddress()),
- 0, batchSize, 0, decoder, comment, source, connection);
+ static CommandReadTransformer> singleBatchCursorTransformer(final String resultsFieldName) {
+ return (result, source, connection) ->
+ new SingleBatchCursor<>(BsonDocumentWrapperHelper.toList(result, resultsFieldName), 0,
+ connection.getDescription().getServerAddress());
}
- static QueryResult getMoreCursorDocumentToQueryResult(final BsonDocument cursorDocument, final ServerAddress serverAddress) {
- return cursorDocumentToQueryResult(cursorDocument, serverAddress, "nextBatch");
+ static BatchCursor cursorDocumentToBatchCursor(final BsonDocument cursorDocument, final Decoder decoder,
+ final BsonValue comment, final ConnectionSource source, final Connection connection, final int batchSize) {
+ return new CommandBatchCursor<>(connection.getDescription().getServerAddress(), cursorDocument, 0, batchSize,
+ 0, decoder, comment, source, connection);
}
private SyncOperationHelper() {
diff --git a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy
index ddbb9f29a0d..372fdd4b82d 100644
--- a/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy
+++ b/driver-core/src/test/functional/com/mongodb/OperationFunctionalSpecification.groovy
@@ -202,13 +202,6 @@ class OperationFunctionalSpecification extends Specification {
}
}
- def consumeAsyncResults(cursor) {
- def batch = next(cursor, true)
- while (batch != null) {
- batch = next(cursor, true)
- }
- }
-
void testOperation(Map params) {
params.async = params.async != null ? params.async : false
params.result = params.result != null ? params.result : null
diff --git a/driver-core/src/test/functional/com/mongodb/client/model/OperationTest.java b/driver-core/src/test/functional/com/mongodb/client/model/OperationTest.java
index 9a215c7260c..75ec2c60e2b 100644
--- a/driver-core/src/test/functional/com/mongodb/client/model/OperationTest.java
+++ b/driver-core/src/test/functional/com/mongodb/client/model/OperationTest.java
@@ -18,14 +18,17 @@
import com.mongodb.ClusterFixture;
import com.mongodb.MongoNamespace;
+import com.mongodb.async.FutureResultCallback;
import com.mongodb.client.test.CollectionHelper;
import com.mongodb.internal.connection.ServerHelper;
+import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonValue;
import org.bson.Document;
+import org.bson.FieldNameValidator;
import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.DocumentCodec;
@@ -39,8 +42,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static com.mongodb.ClusterFixture.TIMEOUT;
import static com.mongodb.ClusterFixture.checkReferenceCountReachesTarget;
import static com.mongodb.ClusterFixture.getAsyncBinding;
import static com.mongodb.ClusterFixture.getBinding;
@@ -54,6 +60,7 @@
public abstract class OperationTest {
protected static final DocumentCodec DOCUMENT_DECODER = new DocumentCodec();
+ protected static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
@BeforeEach
public void beforeEach() {
@@ -77,15 +84,15 @@ private CollectionHelper getCollectionHelper(final MongoNamespace
return new CollectionHelper<>(new BsonDocumentCodec(), namespace);
}
- private String getDatabaseName() {
+ protected String getDatabaseName() {
return ClusterFixture.getDefaultDatabaseName();
}
- private String getCollectionName() {
+ protected String getCollectionName() {
return "test";
}
- MongoNamespace getNamespace() {
+ protected MongoNamespace getNamespace() {
return new MongoNamespace(getDatabaseName(), getCollectionName());
}
@@ -97,7 +104,6 @@ public static BsonDocument toBsonDocument(final BsonDocument bsonDocument) {
return getDefaultCodecRegistry().get(BsonDocument.class).decode(bsonDocument.asBsonReader(), DecoderContext.builder().build());
}
-
protected List assertPipeline(final String stageAsString, final Bson stage) {
List pipeline = Collections.singletonList(stage);
return assertPipeline(stageAsString, pipeline);
@@ -159,4 +165,25 @@ protected List