decoder, final BsonValue comment, final AsyncConnectionSource connectionSource,
- @Nullable final AsyncConnection connection, @Nullable final BsonDocument result) {
- isTrueArgument("maxTimeMS >= 0", maxTimeMS >= 0);
- this.maxTimeMS = maxTimeMS;
- this.namespace = firstBatch.getNamespace();
- this.firstBatch = firstBatch;
- this.limit = limit;
- this.batchSize = batchSize;
- this.decoder = decoder;
- this.comment = comment;
- this.cursor = new AtomicReference<>(firstBatch.getCursor());
- this.count.addAndGet(firstBatch.getResults().size());
- if (result != null) {
- this.operationTime = result.getTimestamp(OPERATION_TIME, null);
- this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
- } else {
- this.operationTime = null;
- }
-
- firstBatchEmpty = firstBatch.getResults().isEmpty();
- if (cursor.get() != null) {
- this.connectionSource = notNull("connectionSource", connectionSource).retain();
- assertNotNull(connection);
- if (limitReached()) {
- killCursor(connection);
- } else {
- if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) {
- this.pinnedConnection = connection.retain();
- this.pinnedConnection.markAsPinned(Connection.PinningMode.CURSOR);
- }
- }
- }
- this.maxWireVersion = connection == null ? 0 : connection.getDescription().getMaxWireVersion();
- logQueryResult(firstBatch);
- }
-
- /**
- * {@inheritDoc}
- *
- * From the perspective of the code external to this class, this method is idempotent as required by its specification.
- * However, if this method sets {@link #isClosePending},
- * then it must be called by {@code this} again to release resources.
- * This behavior does not violate externally observable idempotence because this method is allowed to release resources "eventually".
- */
- @Override
- public void close() {
- boolean doClose = withLock(lock, () -> {
- if (isOperationInProgress) {
- isClosePending = true;
- return false;
- } else if (!isClosed) {
- isClosed = true;
- isClosePending = false;
- return true;
- }
- return false;
- });
-
- if (doClose) {
- killCursorOnClose();
- }
- }
-
- @Override
- public void next(final SingleResultCallback> callback) {
- if (isClosed()) {
- callback.onResult(null, new MongoException("next() called after the cursor was closed."));
- } else if (firstBatch != null && (!firstBatch.getResults().isEmpty())) {
- // May be empty for a tailable cursor
- List results = firstBatch.getResults();
- firstBatch = null;
- if (getServerCursor() == null) {
- close();
- }
- callback.onResult(results, null);
- } else {
- ServerCursor localCursor = getServerCursor();
- if (localCursor == null) {
- close();
- callback.onResult(null, null);
- } else {
- boolean doGetMore = withLock(lock, () -> {
- if (isClosed()) {
- callback.onResult(null, new MongoException("next() called after the cursor was closed."));
- return false;
- }
- isOperationInProgress = true;
- return true;
- });
- if (doGetMore) {
- getMore(localCursor, callback);
- }
- }
- }
- }
-
- @Override
- public void setBatchSize(final int batchSize) {
- assertFalse(isClosed());
- this.batchSize = batchSize;
- }
-
- @Override
- public int getBatchSize() {
- assertFalse(isClosed());
- return batchSize;
- }
-
- @Override
- public boolean isClosed() {
- return withLock(lock, () -> isClosed || isClosePending);
- }
-
- @Override
- public BsonDocument getPostBatchResumeToken() {
- return postBatchResumeToken;
- }
-
- @Override
- public BsonTimestamp getOperationTime() {
- return operationTime;
- }
-
- @Override
- public boolean isFirstBatchEmpty() {
- return firstBatchEmpty;
- }
-
- @Override
- public int getMaxWireVersion() {
- return maxWireVersion;
- }
-
- private boolean limitReached() {
- return Math.abs(limit) != 0 && count.get() >= Math.abs(limit);
- }
-
- private void getMore(final ServerCursor cursor, final SingleResultCallback> callback) {
- if (pinnedConnection != null) {
- getMore(pinnedConnection.retain(), cursor, callback);
- } else {
- connectionSource.getConnection((connection, t) -> {
- if (t != null) {
- endOperationInProgress();
- callback.onResult(null, t);
- } else {
- getMore(assertNotNull(connection), cursor, callback);
- }
- });
- }
- }
-
- private void getMore(final AsyncConnection connection, final ServerCursor cursor, final SingleResultCallback> callback) {
- connection.commandAsync(namespace.getDatabaseName(), asGetMoreCommandDocument(cursor.getId(), connection.getDescription()),
- NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), CommandResultDocumentCodec.create(decoder, "nextBatch"),
- connectionSource, new CommandResultSingleResultCallback(connection, cursor, callback));
- }
-
- private BsonDocument asGetMoreCommandDocument(final long cursorId, final ConnectionDescription connectionDescription) {
- BsonDocument document = new BsonDocument("getMore", new BsonInt64(cursorId))
- .append("collection", new BsonString(namespace.getCollectionName()));
-
- int batchSizeForGetMoreCommand = Math.abs(getNumberToReturn(limit, this.batchSize, count.get()));
- if (batchSizeForGetMoreCommand != 0) {
- document.append("batchSize", new BsonInt32(batchSizeForGetMoreCommand));
- }
- if (maxTimeMS != 0) {
- document.append("maxTimeMS", new BsonInt64(maxTimeMS));
- }
- if (serverIsAtLeastVersionFourDotFour(connectionDescription)) {
- putIfNotNull(document, "comment", comment);
- }
- return document;
- }
-
- private void killCursorOnClose() {
- ServerCursor localCursor = getServerCursor();
- if (localCursor != null) {
- if (pinnedConnection != null) {
- killCursorAsynchronouslyAndReleaseConnectionAndSource(pinnedConnection, localCursor);
- } else {
- connectionSource.getConnection((connection, t) -> {
- if (t != null) {
- connectionSource.release();
- } else {
- killCursorAsynchronouslyAndReleaseConnectionAndSource(assertNotNull(connection), localCursor);
- }
- });
- }
- } else if (pinnedConnection != null) {
- pinnedConnection.release();
- }
- }
-
- private void killCursor(final AsyncConnection connection) {
- ServerCursor localCursor = cursor.getAndSet(null);
- if (localCursor != null) {
- killCursorAsynchronouslyAndReleaseConnectionAndSource(connection.retain(), localCursor);
- } else {
- connectionSource.release();
- }
- }
-
- private void killCursorAsynchronouslyAndReleaseConnectionAndSource(final AsyncConnection connection, final ServerCursor localCursor) {
- connection.commandAsync(namespace.getDatabaseName(), asKillCursorsCommandDocument(localCursor), NO_OP_FIELD_NAME_VALIDATOR,
- ReadPreference.primary(), new BsonDocumentCodec(), connectionSource, (result, t) -> {
- connection.release();
- connectionSource.release();
- });
- }
-
- private BsonDocument asKillCursorsCommandDocument(final ServerCursor localCursor) {
- return new BsonDocument("killCursors", new BsonString(namespace.getCollectionName()))
- .append("cursors", new BsonArray(singletonList(new BsonInt64(localCursor.getId()))));
- }
-
- private void endOperationInProgress() {
- boolean closePending = withLock(lock, () -> {
- isOperationInProgress = false;
- return this.isClosePending;
- });
- if (closePending) {
- close();
- }
- }
-
-
- private void handleGetMoreQueryResult(final AsyncConnection connection, final SingleResultCallback> callback,
- final QueryResult result) {
- logQueryResult(result);
- cursor.set(result.getCursor());
- if (isClosePending) {
- try {
- connection.release();
- if (result.getCursor() == null) {
- connectionSource.release();
- }
- endOperationInProgress();
- } finally {
- callback.onResult(null, null);
- }
- } else if (result.getResults().isEmpty() && result.getCursor() != null) {
- getMore(connection, assertNotNull(result.getCursor()), callback);
- } else {
- count.addAndGet(result.getResults().size());
- if (limitReached()) {
- killCursor(connection);
- connection.release();
- } else {
- connection.release();
- if (result.getCursor() == null) {
- connectionSource.release();
- }
- }
- endOperationInProgress();
-
- if (result.getResults().isEmpty()) {
- callback.onResult(null, null);
- } else {
- callback.onResult(result.getResults(), null);
- }
- }
- }
-
- private void logQueryResult(final QueryResult result) {
- LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", result.getResults().size(),
- result.getCursorId(), result.getAddress()));
- }
-
- private class CommandResultSingleResultCallback implements SingleResultCallback {
- private final AsyncConnection connection;
- private final ServerCursor cursor;
- private final SingleResultCallback> callback;
-
- CommandResultSingleResultCallback(final AsyncConnection connection, final ServerCursor cursor,
- final SingleResultCallback> callback) {
- this.connection = connection;
- this.cursor = cursor;
- this.callback = errorHandlingCallback(callback, LOGGER);
- }
-
- @Override
- public void onResult(@Nullable final BsonDocument result, @Nullable final Throwable t) {
- if (t != null) {
- Throwable translatedException = t instanceof MongoCommandException
- ? translateCommandException((MongoCommandException) t, cursor)
- : t;
- connection.release();
- endOperationInProgress();
- callback.onResult(null, translatedException);
- } else {
- assertNotNull(result);
- QueryResult queryResult = getMoreCursorDocumentToQueryResult(result.getDocument(CURSOR),
- connection.getDescription().getServerAddress());
- postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
- handleGetMoreQueryResult(connection, callback, queryResult);
- }
- }
- }
-
- @Nullable
- ServerCursor getServerCursor() {
- return cursor.get();
- }
-
- @Nullable
- private BsonDocument getPostBatchResumeTokenFromResponse(final BsonDocument result) {
- BsonDocument cursor = result.getDocument(CURSOR, null);
- if (cursor != null) {
- return cursor.getDocument(POST_BATCH_RESUME_TOKEN, null);
- }
- return null;
- }
-}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchQueryCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchCursor.java
similarity index 63%
rename from driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchQueryCursor.java
rename to driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchCursor.java
index f29cda04dae..57b20ff1711 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchQueryCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncSingleBatchCursor.java
@@ -19,19 +19,26 @@
import com.mongodb.MongoException;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
-import com.mongodb.internal.connection.QueryResult;
import java.util.List;
-import static com.mongodb.assertions.Assertions.isTrue;
+import static java.util.Collections.emptyList;
-class AsyncSingleBatchQueryCursor implements AsyncBatchCursor {
- private volatile QueryResult firstBatch;
- private volatile boolean closed;
+class AsyncSingleBatchCursor implements AsyncBatchCursor {
- AsyncSingleBatchQueryCursor(final QueryResult firstBatch) {
- this.firstBatch = firstBatch;
- isTrue("Empty Cursor", firstBatch.getCursor() == null);
+ static AsyncSingleBatchCursor createEmptyAsyncSingleBatchCursor(final int batchSize) {
+ return new AsyncSingleBatchCursor<>(emptyList(), batchSize);
+ }
+
+ private final List batch;
+ private final int batchSize;
+
+ private volatile boolean hasNext = true;
+ private volatile boolean closed = false;
+
+ AsyncSingleBatchCursor(final List batch, final int batchSize) {
+ this.batch = batch;
+ this.batchSize = batchSize;
}
@Override
@@ -43,13 +50,12 @@ public void close() {
public void next(final SingleResultCallback> callback) {
if (closed) {
callback.onResult(null, new MongoException("next() called after the cursor was closed."));
- } else if (firstBatch != null && !firstBatch.getResults().isEmpty()) {
- List results = firstBatch.getResults();
- firstBatch = null;
- callback.onResult(results, null);
+ } else if (hasNext && !batch.isEmpty()) {
+ hasNext = false;
+ callback.onResult(batch, null);
} else {
closed = true;
- callback.onResult(null, null);
+ callback.onResult(emptyList(), null);
}
}
@@ -60,7 +66,7 @@ public void setBatchSize(final int batchSize) {
@Override
public int getBatchSize() {
- return 0;
+ return batchSize;
}
@Override
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
index acf70090457..a3c134b720c 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java
@@ -33,6 +33,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
+import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError;
import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource;
@@ -41,12 +42,12 @@ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor changeStreamOperation;
private final int maxWireVersion;
- private AggregateResponseBatchCursor wrapped;
+ private CommandBatchCursor wrapped;
private BsonDocument resumeToken;
private final AtomicBoolean closed;
ChangeStreamBatchCursor(final ChangeStreamOperation changeStreamOperation,
- final AggregateResponseBatchCursor wrapped,
+ final CommandBatchCursor wrapped,
final ReadBinding binding,
@Nullable final BsonDocument resumeToken,
final int maxWireVersion) {
@@ -58,29 +59,29 @@ final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor getWrapped() {
+ CommandBatchCursor getWrapped() {
return wrapped;
}
@Override
public boolean hasNext() {
- return resumeableOperation(queryBatchCursor -> {
+ return resumeableOperation(commandBatchCursor -> {
try {
- return queryBatchCursor.hasNext();
+ return commandBatchCursor.hasNext();
} finally {
- cachePostBatchResumeToken(queryBatchCursor);
+ cachePostBatchResumeToken(commandBatchCursor);
}
});
}
@Override
public List next() {
- return resumeableOperation(queryBatchCursor -> {
+ return resumeableOperation(commandBatchCursor -> {
try {
- return convertAndProduceLastId(queryBatchCursor.next(), changeStreamOperation.getDecoder(),
+ return convertAndProduceLastId(commandBatchCursor.next(), changeStreamOperation.getDecoder(),
lastId -> resumeToken = lastId);
} finally {
- cachePostBatchResumeToken(queryBatchCursor);
+ cachePostBatchResumeToken(commandBatchCursor);
}
});
}
@@ -92,12 +93,13 @@ public int available() {
@Override
public List tryNext() {
- return resumeableOperation(queryBatchCursor -> {
+ return resumeableOperation(commandBatchCursor -> {
try {
- return convertAndProduceLastId(queryBatchCursor.tryNext(), changeStreamOperation.getDecoder(),
- lastId -> resumeToken = lastId);
+ List tryNext = commandBatchCursor.tryNext();
+ return tryNext == null ? null
+ : convertAndProduceLastId(tryNext, changeStreamOperation.getDecoder(), lastId -> resumeToken = lastId);
} finally {
- cachePostBatchResumeToken(queryBatchCursor);
+ cachePostBatchResumeToken(commandBatchCursor);
}
});
}
@@ -155,9 +157,9 @@ public int getMaxWireVersion() {
return maxWireVersion;
}
- private void cachePostBatchResumeToken(final AggregateResponseBatchCursor queryBatchCursor) {
- if (queryBatchCursor.getPostBatchResumeToken() != null) {
- resumeToken = queryBatchCursor.getPostBatchResumeToken();
+ private void cachePostBatchResumeToken(final AggregateResponseBatchCursor commandBatchCursor) {
+ if (commandBatchCursor.getPostBatchResumeToken() != null) {
+ resumeToken = commandBatchCursor.getPostBatchResumeToken();
}
}
@@ -165,19 +167,17 @@ private void cachePostBatchResumeToken(final AggregateResponseBatchCursor List convertAndProduceLastId(@Nullable final List rawDocuments,
+ static List convertAndProduceLastId(final List rawDocuments,
final Decoder decoder,
final Consumer lastIdConsumer) {
- List results = null;
- if (rawDocuments != null) {
- results = new ArrayList<>();
- for (RawBsonDocument rawDocument : rawDocuments) {
- if (!rawDocument.containsKey("_id")) {
- throw new MongoChangeStreamException("Cannot provide resume functionality when the resume token is missing.");
- }
- results.add(rawDocument.decode(decoder));
+ List results = new ArrayList<>();
+ for (RawBsonDocument rawDocument : assertNotNull(rawDocuments)) {
+ if (!rawDocument.containsKey("_id")) {
+ throw new MongoChangeStreamException("Cannot provide resume functionality when the resume token is missing.");
}
+ results.add(rawDocument.decode(decoder));
+ }
+ if (!rawDocuments.isEmpty()) {
lastIdConsumer.accept(rawDocuments.get(rawDocuments.size() - 1).getDocument("_id"));
}
return results;
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
index a2ba029eb56..8df093a6e9a 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
@@ -20,7 +20,6 @@
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
-import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
@@ -42,9 +41,8 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.notNull;
-import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncReadConnectionSource;
-import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource;
/**
* An operation that executes an {@code $changeStream} aggregation.
@@ -179,16 +177,12 @@ public ChangeStreamOperation showExpandedEvents(final boolean showExpandedEve
return this;
}
-
@Override
public BatchCursor execute(final ReadBinding binding) {
- return withReadConnectionSource(binding, source -> {
- AggregateResponseBatchCursor cursor =
- (AggregateResponseBatchCursor) wrapped.execute(binding);
+ CommandBatchCursor cursor = (CommandBatchCursor) wrapped.execute(binding);
return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion());
- });
}
@Override
@@ -197,25 +191,17 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
if (t != null) {
callback.onResult(null, t);
} else {
- AsyncAggregateResponseBatchCursor cursor =
- (AsyncAggregateResponseBatchCursor) result;
- withAsyncReadConnectionSource(binding, (source, t1) -> {
- if (t1 != null) {
- callback.onResult(null, t1);
- } else {
- callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
- setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
- cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion()), null);
- }
- source.release(); // TODO: can this be null?
- });
+ AsyncCommandBatchCursor cursor = (AsyncCommandBatchCursor) assertNotNull(result);
+ callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
+ setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
+ cursor.getMaxWireVersion(), cursor.isFirstBatchEmpty()), cursor.getMaxWireVersion()), null);
}
});
}
@Nullable
- private BsonDocument setChangeStreamOptions(@Nullable final BsonDocument postBatchResumeToken, final BsonTimestamp operationTime,
- final int maxWireVersion, final boolean firstBatchEmpty) {
+ private BsonDocument setChangeStreamOptions(@Nullable final BsonDocument postBatchResumeToken,
+ @Nullable final BsonTimestamp operationTime, final int maxWireVersion, final boolean firstBatchEmpty) {
BsonDocument resumeToken = null;
if (startAfter != null) {
resumeToken = startAfter;
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java
new file mode 100644
index 00000000000..f71cce0527b
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.MongoSocketException;
+import com.mongodb.ReadPreference;
+import com.mongodb.ServerAddress;
+import com.mongodb.ServerCursor;
+import com.mongodb.annotations.ThreadSafe;
+import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.connection.ServerType;
+import com.mongodb.internal.VisibleForTesting;
+import com.mongodb.internal.binding.ConnectionSource;
+import com.mongodb.internal.connection.Connection;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
+import org.bson.BsonValue;
+import org.bson.codecs.BsonDocumentCodec;
+import org.bson.codecs.Decoder;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_ITERATOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.NO_OP_FIELD_NAME_VALIDATOR;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.getKillCursorsCommand;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.logCommandCursorResult;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
+
+class CommandBatchCursor implements AggregateResponseBatchCursor {
+
+ private final MongoNamespace namespace;
+ private final long maxTimeMS;
+ private final Decoder decoder;
+ @Nullable
+ private final BsonValue comment;
+ private final int maxWireVersion;
+ private final boolean firstBatchEmpty;
+ private final ResourceManager resourceManager;
+
+ private int batchSize;
+ private CommandCursorResult commandCursorResult;
+ @Nullable
+ private List nextBatch;
+
+ CommandBatchCursor(
+ final BsonDocument commandCursorDocument,
+ final int batchSize, final long maxTimeMS,
+ final Decoder decoder,
+ @Nullable final BsonValue comment,
+ final ConnectionSource connectionSource,
+ final Connection connection) {
+ ConnectionDescription connectionDescription = connection.getDescription();
+ this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument);
+ this.namespace = commandCursorResult.getNamespace();
+ this.batchSize = batchSize;
+ this.maxTimeMS = maxTimeMS;
+ this.decoder = decoder;
+ this.comment = comment;
+ this.maxWireVersion = connectionDescription.getMaxWireVersion();
+ this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
+
+ Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null;
+ resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
+ }
+
+ @Override
+ public boolean hasNext() {
+ return assertNotNull(resourceManager.execute(MESSAGE_IF_CLOSED_AS_CURSOR, this::doHasNext));
+ }
+
+ private boolean doHasNext() {
+ if (nextBatch != null) {
+ return true;
+ }
+
+ while (resourceManager.getServerCursor() != null) {
+ getMore();
+ if (!resourceManager.operable()) {
+ throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR);
+ }
+ if (nextBatch != null) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public List next() {
+ return assertNotNull(resourceManager.execute(MESSAGE_IF_CLOSED_AS_ITERATOR, this::doNext));
+ }
+
+ @Override
+ public int available() {
+ return !resourceManager.operable() || nextBatch == null ? 0 : nextBatch.size();
+ }
+
+ @Nullable
+ private List doNext() {
+ if (!doHasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ List retVal = nextBatch;
+ nextBatch = null;
+ return retVal;
+ }
+
+ @VisibleForTesting(otherwise = PRIVATE)
+ boolean isClosed() {
+ return !resourceManager.operable();
+ }
+
+ @Override
+ public void setBatchSize(final int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Not implemented yet!");
+ }
+
+ @Override
+ public void close() {
+ resourceManager.close();
+ }
+
+ @Nullable
+ @Override
+ public List tryNext() {
+ return resourceManager.execute(MESSAGE_IF_CLOSED_AS_CURSOR, () -> {
+ if (!tryHasNext()) {
+ return null;
+ }
+ return doNext();
+ });
+ }
+
+ private boolean tryHasNext() {
+ if (nextBatch != null) {
+ return true;
+ }
+
+ if (resourceManager.getServerCursor() != null) {
+ getMore();
+ }
+
+ return nextBatch != null;
+ }
+
+ @Override
+ @Nullable
+ public ServerCursor getServerCursor() {
+ if (!resourceManager.operable()) {
+ throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
+ }
+ return resourceManager.getServerCursor();
+ }
+
+ @Override
+ public ServerAddress getServerAddress() {
+ if (!resourceManager.operable()) {
+ throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
+ }
+
+ return commandCursorResult.getServerAddress();
+ }
+
+ @Override
+ public BsonDocument getPostBatchResumeToken() {
+ return commandCursorResult.getPostBatchResumeToken();
+ }
+
+ @Override
+ public BsonTimestamp getOperationTime() {
+ return commandCursorResult.getOperationTime();
+ }
+
+ @Override
+ public boolean isFirstBatchEmpty() {
+ return firstBatchEmpty;
+ }
+
+ @Override
+ public int getMaxWireVersion() {
+ return maxWireVersion;
+ }
+
+ private void getMore() {
+ ServerCursor serverCursor = assertNotNull(resourceManager.getServerCursor());
+ resourceManager.executeWithConnection(connection -> {
+ ServerCursor nextServerCursor;
+ try {
+ this.commandCursorResult = toCommandCursorResult(connection.getDescription().getServerAddress(), NEXT_BATCH,
+ assertNotNull(
+ connection.command(namespace.getDatabaseName(),
+ getMoreCommandDocument(serverCursor.getId(), connection.getDescription(), namespace, batchSize,
+ maxTimeMS, comment),
+ NO_OP_FIELD_NAME_VALIDATOR,
+ ReadPreference.primary(),
+ CommandResultDocumentCodec.create(decoder, NEXT_BATCH),
+ assertNotNull(resourceManager.getConnectionSource()))));
+ nextServerCursor = commandCursorResult.getServerCursor();
+ } catch (MongoCommandException e) {
+ throw translateCommandException(e, serverCursor);
+ }
+ resourceManager.setServerCursor(nextServerCursor);
+ });
+ }
+
+ private CommandCursorResult toCommandCursorResult(final ServerAddress serverAddress, final String fieldNameContainingBatch,
+ final BsonDocument commandCursorDocument) {
+ CommandCursorResult commandCursorResult = new CommandCursorResult<>(serverAddress, fieldNameContainingBatch,
+ commandCursorDocument);
+ logCommandCursorResult(commandCursorResult);
+ this.nextBatch = commandCursorResult.getResults().isEmpty() ? null : commandCursorResult.getResults();
+ return commandCursorResult;
+ }
+
+ @ThreadSafe
+ private static final class ResourceManager extends CursorResourceManager {
+
+ ResourceManager(
+ final MongoNamespace namespace,
+ final ConnectionSource connectionSource,
+ @Nullable final Connection connectionToPin,
+ @Nullable final ServerCursor serverCursor) {
+ super(namespace, connectionSource, connectionToPin, serverCursor);
+ }
+
+ /**
+ * Thread-safe.
+ * Executes {@code operation} within the {@link #tryStartOperation()}/{@link #endOperation()} bounds.
+ *
+ * @throws IllegalStateException If {@linkplain CommandBatchCursor#close() closed}.
+ */
+ @Nullable
+ R execute(final String exceptionMessageIfClosed, final Supplier operation) throws IllegalStateException {
+ if (!tryStartOperation()) {
+ throw new IllegalStateException(exceptionMessageIfClosed);
+ }
+ try {
+ return operation.get();
+ } finally {
+ endOperation();
+ }
+ }
+
+ @Override
+ void markAsPinned(final Connection connectionToPin, final Connection.PinningMode pinningMode) {
+ connectionToPin.markAsPinned(pinningMode);
+ }
+
+ @Override
+ void doClose() {
+ if (isSkipReleasingServerResourcesOnClose()) {
+ unsetServerCursor();
+ }
+ try {
+ if (getServerCursor() != null) {
+ Connection connection = getConnection();
+ try {
+ releaseServerResources(connection);
+ } finally {
+ connection.release();
+ }
+ }
+ } catch (MongoException e) {
+ // ignore exceptions when releasing server resources
+ } finally {
+ // guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
+ unsetServerCursor();
+ releaseClientResources();
+ }
+ }
+
+ void executeWithConnection(final Consumer action) {
+ Connection connection = getConnection();
+ try {
+ action.accept(connection);
+ } catch (MongoSocketException e) {
+ onCorruptedConnection(connection, e);
+ throw e;
+ } finally {
+ connection.release();
+ }
+ }
+
+ private Connection getConnection() {
+ assertTrue(getState() != State.IDLE);
+ Connection pinnedConnection = getPinnedConnection();
+ if (pinnedConnection == null) {
+ return assertNotNull(getConnectionSource()).getConnection();
+ } else {
+ return pinnedConnection.retain();
+ }
+ }
+
+ private void releaseServerResources(final Connection connection) {
+ try {
+ ServerCursor localServerCursor = getServerCursor();
+ if (localServerCursor != null) {
+ killServerCursor(getNamespace(), localServerCursor, connection);
+ }
+ } finally {
+ unsetServerCursor();
+ }
+ }
+
+ private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor,
+ final Connection localConnection) {
+ localConnection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
+ NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(),
+ assertNotNull(getConnectionSource()));
+ }
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursorHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursorHelper.java
new file mode 100644
index 00000000000..eaf03c68ec3
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursorHelper.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.MongoCommandException;
+import com.mongodb.MongoCursorNotFoundException;
+import com.mongodb.MongoNamespace;
+import com.mongodb.MongoQueryException;
+import com.mongodb.ServerCursor;
+import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.validator.NoOpFieldNameValidator;
+import com.mongodb.lang.Nullable;
+import org.bson.BsonArray;
+import org.bson.BsonDocument;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+import org.bson.BsonValue;
+import org.bson.FieldNameValidator;
+
+import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
+import static com.mongodb.internal.operation.OperationHelper.LOGGER;
+import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour;
+import static java.lang.String.format;
+import static java.util.Collections.singletonList;
+
+final class CommandBatchCursorHelper {
+
+ static final String FIRST_BATCH = "firstBatch";
+ static final String NEXT_BATCH = "nextBatch";
+ static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
+ static final String MESSAGE_IF_CLOSED_AS_CURSOR = "Cursor has been closed";
+ static final String MESSAGE_IF_CLOSED_AS_ITERATOR = "Iterator has been closed";
+
+ static final String MESSAGE_IF_CONCURRENT_OPERATION = "Another operation is currently in progress, concurrent operations are not "
+ + "supported";
+
+ static BsonDocument getMoreCommandDocument(
+ final long cursorId, final ConnectionDescription connectionDescription, final MongoNamespace namespace, final int batchSize,
+ final long maxTimeMS, @Nullable final BsonValue comment) {
+ BsonDocument document = new BsonDocument("getMore", new BsonInt64(cursorId))
+ .append("collection", new BsonString(namespace.getCollectionName()));
+
+ if (batchSize != 0) {
+ document.append("batchSize", new BsonInt32(batchSize));
+ }
+ if (maxTimeMS != 0) {
+ document.append("maxTimeMS", new BsonInt64(maxTimeMS));
+ }
+ if (serverIsAtLeastVersionFourDotFour(connectionDescription)) {
+ putIfNotNull(document, "comment", comment);
+ }
+ return document;
+ }
+
+ static CommandCursorResult logCommandCursorResult(final CommandCursorResult commandCursorResult) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", commandCursorResult.getResults().size(),
+ commandCursorResult.getCursorId(), commandCursorResult.getServerAddress()));
+ }
+ return commandCursorResult;
+ }
+
+ static BsonDocument getKillCursorsCommand(final MongoNamespace namespace, final ServerCursor serverCursor) {
+ return new BsonDocument("killCursors", new BsonString(namespace.getCollectionName()))
+ .append("cursors", new BsonArray(singletonList(new BsonInt64(serverCursor.getId()))));
+ }
+
+
+ static MongoQueryException translateCommandException(final MongoCommandException commandException, final ServerCursor cursor) {
+ if (commandException.getErrorCode() == 43) {
+ return new MongoCursorNotFoundException(cursor.getId(), commandException.getResponse(), cursor.getAddress());
+ } else {
+ return new MongoQueryException(commandException.getResponse(), commandException.getServerAddress());
+ }
+ }
+
+ private CommandBatchCursorHelper() {
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/QueryResult.java b/driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java
similarity index 52%
rename from driver-core/src/main/com/mongodb/internal/connection/QueryResult.java
rename to driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java
index 52970ba7b94..7bfbfb33cbe 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/QueryResult.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandCursorResult.java
@@ -14,40 +14,50 @@
* limitations under the License.
*/
-package com.mongodb.internal.connection;
+package com.mongodb.internal.operation;
import com.mongodb.MongoNamespace;
import com.mongodb.ServerAddress;
import com.mongodb.ServerCursor;
import com.mongodb.lang.Nullable;
+import org.bson.BsonDocument;
+import org.bson.BsonTimestamp;
import java.util.List;
+import static com.mongodb.assertions.Assertions.isTrue;
+
/**
- * A batch of query results.
+ * The command cursor result
*
* This class is not part of the public API and may be removed or changed at any time
*/
-public class QueryResult {
- private final MongoNamespace namespace;
+public class CommandCursorResult {
+
+ private static final String CURSOR = "cursor";
+ private static final String POST_BATCH_RESUME_TOKEN = "postBatchResumeToken";
+ private static final String OPERATION_TIME = "operationTime";
+ private final ServerAddress serverAddress;
private final List results;
+ private final MongoNamespace namespace;
private final long cursorId;
- private final ServerAddress serverAddress;
+ @Nullable
+ private final BsonTimestamp operationTime;
+ @Nullable
+ private final BsonDocument postBatchResumeToken;
- /**
- * Construct an instance.
- *
- * @param namespace the namespace
- * @param results the query results
- * @param cursorId the cursor id
- * @param serverAddress the server address
- */
- public QueryResult(@Nullable final MongoNamespace namespace, final List results, final long cursorId,
- final ServerAddress serverAddress) {
- this.namespace = namespace;
- this.results = results;
- this.cursorId = cursorId;
+ public CommandCursorResult(
+ final ServerAddress serverAddress,
+ final String fieldNameContainingBatch,
+ final BsonDocument commandCursorDocument) {
+ isTrue("Contains cursor", commandCursorDocument.isDocument(CURSOR));
this.serverAddress = serverAddress;
+ BsonDocument cursorDocument = commandCursorDocument.getDocument(CURSOR);
+ this.results = BsonDocumentWrapperHelper.toList(cursorDocument, fieldNameContainingBatch);
+ this.namespace = new MongoNamespace(cursorDocument.getString("ns").getValue());
+ this.cursorId = cursorDocument.getNumber("id").longValue();
+ this.operationTime = cursorDocument.getTimestamp(OPERATION_TIME, null);
+ this.postBatchResumeToken = cursorDocument.getDocument(POST_BATCH_RESUME_TOKEN, null);
}
/**
@@ -55,7 +65,6 @@ public QueryResult(@Nullable final MongoNamespace namespace, final List resul
*
* @return the namespace
*/
- @Nullable
public MongoNamespace getNamespace() {
return namespace;
}
@@ -66,7 +75,7 @@ public MongoNamespace getNamespace() {
* @return the cursor, which may be null if it's been exhausted
*/
@Nullable
- public ServerCursor getCursor() {
+ public ServerCursor getServerCursor() {
return cursorId == 0 ? null : new ServerCursor(cursorId, serverAddress);
}
@@ -84,11 +93,21 @@ public List getResults() {
*
* @return the server address
*/
- public ServerAddress getAddress() {
+ public ServerAddress getServerAddress() {
return serverAddress;
}
public long getCursorId() {
return cursorId;
}
+
+ @Nullable
+ public BsonDocument getPostBatchResumeToken() {
+ return postBatchResumeToken;
+ }
+
+ @Nullable
+ public BsonTimestamp getOperationTime() {
+ return operationTime;
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CursorHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CursorHelper.java
index aea2d2df213..26511c86885 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CursorHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CursorHelper.java
@@ -22,34 +22,6 @@
final class CursorHelper {
- /**
- * Gets the limit of the number of documents in the OP_REPLY response to the get more request. A value of zero tells the server to
- * use the default size. A negative value tells the server to return no more than that number and immediately close the cursor.
- * Otherwise, the server will return no more than that number and return the same cursorId to allow the rest of the documents to be
- * fetched, if it turns out there are more documents.
- *
- * The value returned by this method is based on the limit, the batch size, both of which can be positive, negative, or zero, and the
- * number of documents fetched so far.
- *
- * @return the value for numberToReturn in the OP_GET_MORE wire protocol message.
- * @mongodb.driver.manual ../meta-driver/latest/legacy/mongodb-wire-protocol/#op-get-more OP_GET_MORE
- * @param limit the user-specified limit on the number of results returned
- * @param batchSize the user-specified batch size
- * @param numReturnedSoFar the number of results returned so far
- */
- static int getNumberToReturn(final int limit, final int batchSize, final int numReturnedSoFar) {
- int numberToReturn;
- if (Math.abs(limit) != 0) {
- numberToReturn = Math.abs(limit) - numReturnedSoFar;
- if (batchSize != 0 && numberToReturn > Math.abs(batchSize)) {
- numberToReturn = batchSize;
- }
- } else {
- numberToReturn = batchSize;
- }
- return numberToReturn;
- }
-
static BsonDocument getCursorDocumentFromBatchSize(@Nullable final Integer batchSize) {
return batchSize == null ? new BsonDocument() : new BsonDocument("batchSize", new BsonInt32(batchSize));
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java
new file mode 100644
index 00000000000..cb2e5c58e84
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java
@@ -0,0 +1,277 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.internal.operation;
+
+import com.mongodb.MongoNamespace;
+import com.mongodb.MongoSocketException;
+import com.mongodb.ServerCursor;
+import com.mongodb.annotations.ThreadSafe;
+import com.mongodb.internal.binding.ReferenceCounted;
+import com.mongodb.internal.connection.Connection;
+import com.mongodb.lang.Nullable;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.assertNull;
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.assertions.Assertions.fail;
+import static com.mongodb.internal.Locks.withLock;
+import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CONCURRENT_OPERATION;
+
+/**
+ * This is the resource manager for {@link CommandBatchCursor} or {@link AsyncCommandBatchCursor} implementations.
+ *
+ * This class maintains all resources that must be released in {@link CommandBatchCursor#close()} /
+ * {@link AsyncCommandBatchCursor#close()}. The abstract {@linkplain #doClose() deferred close action} is such that it is totally
+ * ordered with other operations of {@link CommandBatchCursor} / {@link AsyncCommandBatchCursor} (methods {@link #tryStartOperation()}/
+ * {@link #endOperation()} must be used properly to enforce the order) despite the method {@link CommandBatchCursor#close()} /
+ * {@link AsyncCommandBatchCursor#close()} being called concurrently with those operations.
+ *
+ * This total order induces the happens-before order.
+ *
+ * The deferred close action does not violate externally observable idempotence of {@link CommandBatchCursor#close()} /
+ * {@link AsyncCommandBatchCursor#close()}, because the close method is allowed to release resources "eventually".
+ *
+ * Only methods explicitly documented as thread-safe are thread-safe,
+ * others are not and rely on the total order mentioned above.
+ */
+@ThreadSafe
+abstract class CursorResourceManager {
+ private final Lock lock;
+ private final MongoNamespace namespace;
+ private volatile State state;
+ @Nullable
+ private volatile CS connectionSource;
+ @Nullable
+ private volatile C pinnedConnection;
+ @Nullable
+ private volatile ServerCursor serverCursor;
+ private volatile boolean skipReleasingServerResourcesOnClose;
+
+ CursorResourceManager(
+ final MongoNamespace namespace,
+ final CS connectionSource,
+ @Nullable final C connectionToPin,
+ @Nullable final ServerCursor serverCursor) {
+ this.lock = new ReentrantLock();
+ this.namespace = namespace;
+ this.state = State.IDLE;
+ if (serverCursor != null) {
+ connectionSource.retain();
+ this.connectionSource = connectionSource;
+ if (connectionToPin != null) {
+ connectionToPin.retain();
+ markAsPinned(connectionToPin, Connection.PinningMode.CURSOR);
+ this.pinnedConnection = connectionToPin;
+ }
+ }
+ this.skipReleasingServerResourcesOnClose = false;
+ this.serverCursor = serverCursor;
+ }
+
+ /**
+ * Thread-safe.
+ */
+ MongoNamespace getNamespace() {
+ return namespace;
+ }
+
+ /**
+ * Thread-safe.
+ */
+ State getState() {
+ return state;
+ }
+
+ /**
+ * Thread-safe.
+ */
+ @Nullable
+ CS getConnectionSource() {
+ return connectionSource;
+ }
+
+ /**
+ * Thread-safe.
+ */
+ @Nullable
+ C getPinnedConnection() {
+ return pinnedConnection;
+ }
+
+ /**
+ * Thread-safe.
+ */
+ boolean isSkipReleasingServerResourcesOnClose() {
+ return skipReleasingServerResourcesOnClose;
+ }
+
+ @SuppressWarnings("SameParameterValue")
+ abstract void markAsPinned(C connectionToPin, Connection.PinningMode pinningMode);
+
+ /**
+ * Thread-safe.
+ */
+ boolean operable() {
+ return state.operable();
+ }
+
+ /**
+ * Thread-safe.
+ * Returns {@code true} iff started an operation.
+ * If {@linkplain #operable() closed}, then returns false, otherwise completes abruptly.
+ *
+ * @throws IllegalStateException Iff another operation is in progress.
+ */
+ boolean tryStartOperation() throws IllegalStateException {
+ return withLock(lock, () -> {
+ State localState = state;
+ if (!localState.operable()) {
+ return false;
+ } else if (localState == State.IDLE) {
+ state = State.OPERATION_IN_PROGRESS;
+ return true;
+ } else if (localState == State.OPERATION_IN_PROGRESS) {
+ throw new IllegalStateException(MESSAGE_IF_CONCURRENT_OPERATION);
+ } else {
+ throw fail(state.toString());
+ }
+ });
+ }
+
+ /**
+ * Thread-safe.
+ */
+ void endOperation() {
+ boolean doClose = withLock(lock, () -> {
+ State localState = state;
+ if (localState == State.OPERATION_IN_PROGRESS) {
+ state = State.IDLE;
+ } else if (localState == State.CLOSE_PENDING) {
+ state = State.CLOSED;
+ return true;
+ } else if (localState != State.CLOSED) {
+ throw fail(localState.toString());
+ }
+ return false;
+ });
+ if (doClose) {
+ doClose();
+ }
+ }
+
+ /**
+ * Thread-safe.
+ */
+ void close() {
+ boolean doClose = withLock(lock, () -> {
+ State localState = state;
+ if (localState == State.OPERATION_IN_PROGRESS) {
+ state = State.CLOSE_PENDING;
+ } else if (localState != State.CLOSED) {
+ state = State.CLOSED;
+ return true;
+ }
+ return false;
+ });
+ if (doClose) {
+ doClose();
+ }
+ }
+
+ /**
+ * This method is never executed concurrently with either itself or other operations
+ * demarcated by {@link #tryStartOperation()}/{@link #endOperation()}.
+ */
+ abstract void doClose();
+
+ void onCorruptedConnection(@Nullable final C corruptedConnection, final MongoSocketException e) {
+ // if `pinnedConnection` is corrupted, then we cannot kill `serverCursor` via such a connection
+ C localPinnedConnection = pinnedConnection;
+ if (localPinnedConnection != null) {
+ if (corruptedConnection != localPinnedConnection) {
+ e.addSuppressed(new AssertionError("Corrupted connection does not equal the pinned connection."));
+ }
+ skipReleasingServerResourcesOnClose = true;
+ }
+ }
+
+ /**
+ * Thread-safe.
+ */
+ @Nullable
+ ServerCursor getServerCursor() {
+ return serverCursor;
+ }
+
+ void setServerCursor(@Nullable final ServerCursor serverCursor) {
+ assertTrue(state.inProgress());
+ assertNotNull(this.serverCursor);
+ // without `connectionSource` we will not be able to kill `serverCursor` later
+ assertNotNull(connectionSource);
+ this.serverCursor = serverCursor;
+ if (serverCursor == null) {
+ releaseClientResources();
+ }
+ }
+
+ void unsetServerCursor() {
+ this.serverCursor = null;
+ }
+
+ void releaseClientResources() {
+ assertNull(serverCursor);
+ CS localConnectionSource = connectionSource;
+ if (localConnectionSource != null) {
+ localConnectionSource.release();
+ connectionSource = null;
+ }
+ C localPinnedConnection = pinnedConnection;
+ if (localPinnedConnection != null) {
+ localPinnedConnection.release();
+ pinnedConnection = null;
+ }
+ }
+
+ enum State {
+ IDLE(true, false),
+ OPERATION_IN_PROGRESS(true, true),
+ /**
+ * Implies {@link #OPERATION_IN_PROGRESS}.
+ */
+ CLOSE_PENDING(false, true),
+ CLOSED(false, false);
+
+ private final boolean operable;
+ private final boolean inProgress;
+
+ State(final boolean operable, final boolean inProgress) {
+ this.operable = operable;
+ this.inProgress = inProgress;
+ }
+
+ boolean operable() {
+ return operable;
+ }
+
+ boolean inProgress() {
+ return inProgress;
+ }
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java
index a64c4cbfadd..d9fa0cfd72e 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java
@@ -23,7 +23,6 @@
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
-import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
@@ -36,15 +35,15 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
-import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
+import static com.mongodb.internal.operation.AsyncOperationHelper.asyncSingleBatchCursorTransformer;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
-import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
+import static com.mongodb.internal.operation.SyncOperationHelper.singleBatchCursorTransformer;
/**
* Finds the distinct values for a specified field across a single collection.
@@ -116,42 +115,22 @@ public DistinctOperation comment(final BsonValue comment) {
return this;
}
-
@Override
public BatchCursor execute(final ReadBinding binding) {
return executeRetryableRead(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
- createCommandDecoder(), transformer(), retryReads);
+ createCommandDecoder(), singleBatchCursorTransformer(VALUES), retryReads);
}
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) {
executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
- createCommandDecoder(), asyncTransformer(), retryReads, errorHandlingCallback(callback, LOGGER));
+ createCommandDecoder(), asyncSingleBatchCursorTransformer(VALUES), retryReads, errorHandlingCallback(callback, LOGGER));
}
private Codec createCommandDecoder() {
return CommandResultDocumentCodec.create(decoder, VALUES);
}
- private QueryResult createQueryResult(final BsonDocument result, final ConnectionDescription description) {
- return new QueryResult<>(namespace, BsonDocumentWrapperHelper.toList(result, VALUES), 0L,
- description.getServerAddress());
- }
-
- private CommandReadTransformer> transformer() {
- return (result, source, connection) -> {
- QueryResult queryResult = createQueryResult(result, connection.getDescription());
- return new QueryBatchCursor<>(queryResult, 0, 0, decoder, comment, source);
- };
- }
-
- private CommandReadTransformerAsync> asyncTransformer() {
- return (result, source, connection) -> {
- QueryResult queryResult = createQueryResult(result, connection.getDescription());
- return new AsyncSingleBatchQueryCursor<>(queryResult);
- };
- }
-
private CommandCreator getCommandCreator(final SessionContext sessionContext) {
return (serverDescription, connectionDescription) -> getCommand(sessionContext, connectionDescription);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java
index dcb94211fcf..72d20835aa1 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java
@@ -29,7 +29,6 @@
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.connection.NoOpSessionContext;
-import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonBoolean;
@@ -57,7 +56,6 @@
import static com.mongodb.internal.operation.ExplainHelper.asExplainCommand;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.canRetryRead;
-import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
import static com.mongodb.internal.operation.ServerVersionHelper.MIN_WIRE_VERSION;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
@@ -471,13 +469,9 @@ private boolean isAwaitData() {
return cursorType == CursorType.TailableAwait;
}
- private CommandReadTransformer> transformer() {
- return (result, source, connection) -> {
- QueryResult queryResult = cursorDocumentToQueryResult(result.getDocument("cursor"),
- connection.getDescription().getServerAddress());
- return new QueryBatchCursor<>(queryResult, limit, batchSize, getMaxTimeForCursor(), decoder, comment, source, connection,
- result);
- };
+ private CommandReadTransformer> transformer() {
+ return (result, source, connection) ->
+ new CommandBatchCursor<>(result, batchSize, getMaxTimeForCursor(), decoder, comment, source, connection);
}
private long getMaxTimeForCursor() {
@@ -485,11 +479,7 @@ private long getMaxTimeForCursor() {
}
private CommandReadTransformerAsync> asyncTransformer() {
- return (result, source, connection) -> {
- QueryResult queryResult = cursorDocumentToQueryResult(result.getDocument("cursor"),
- connection.getDescription().getServerAddress());
- return new AsyncQueryBatchCursor<>(queryResult, limit, batchSize, getMaxTimeForCursor(), decoder, comment, source,
- connection, result);
- };
+ return (result, source, connection) ->
+ new AsyncCommandBatchCursor<>(result, batchSize, getMaxTimeForCursor(), decoder, comment, source, connection);
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java
index fa2a5dcd995..f8ef462b5d2 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java
@@ -22,7 +22,6 @@
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.async.function.AsyncCallbackSupplier;
import com.mongodb.internal.async.function.RetryState;
-import com.mongodb.internal.binding.AsyncConnectionSource;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.lang.Nullable;
@@ -40,11 +39,11 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
-import static com.mongodb.internal.operation.AsyncOperationHelper.createEmptyAsyncBatchCursor;
import static com.mongodb.internal.operation.AsyncOperationHelper.createReadCommandAndExecuteAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.cursorDocumentToAsyncBatchCursor;
import static com.mongodb.internal.operation.AsyncOperationHelper.decorateReadWithRetriesAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection;
+import static com.mongodb.internal.operation.AsyncSingleBatchCursor.createEmptyAsyncSingleBatchCursor;
import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState;
import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError;
import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError;
@@ -52,7 +51,7 @@
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.canRetryRead;
-import static com.mongodb.internal.operation.OperationHelper.createEmptyBatchCursor;
+import static com.mongodb.internal.operation.SingleBatchCursor.createEmptySingleBatchCursor;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute;
import static com.mongodb.internal.operation.SyncOperationHelper.cursorDocumentToBatchCursor;
@@ -148,8 +147,8 @@ public BatchCursor execute(final ReadBinding binding) {
return createReadCommandAndExecute(retryState, binding, source, databaseName, getCommandCreator(),
createCommandDecoder(), commandTransformer(), connection);
} catch (MongoCommandException e) {
- return rethrowIfNotNamespaceError(e, createEmptyBatchCursor(createNamespace(), decoder,
- source.getServerDescription().getAddress(), batchSize));
+ return rethrowIfNotNamespaceError(e,
+ createEmptySingleBatchCursor(source.getServerDescription().getAddress(), batchSize));
}
})
);
@@ -173,7 +172,8 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
if (t != null && !isNamespaceError(t)) {
releasingCallback.onResult(null, t);
} else {
- releasingCallback.onResult(result != null ? result : emptyAsyncCursor(source), null);
+ releasingCallback.onResult(result != null
+ ? result : createEmptyAsyncSingleBatchCursor(getBatchSize()), null);
}
});
})
@@ -181,20 +181,16 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
asyncRead.get(errorHandlingCallback(callback, LOGGER));
}
- private AsyncBatchCursor emptyAsyncCursor(final AsyncConnectionSource source) {
- return createEmptyAsyncBatchCursor(createNamespace(), source.getServerDescription().getAddress());
- }
-
private MongoNamespace createNamespace() {
return new MongoNamespace(databaseName, "$cmd.listCollections");
}
private CommandReadTransformerAsync> asyncTransformer() {
- return (result, source, connection) -> cursorDocumentToAsyncBatchCursor(result.getDocument("cursor"), decoder, comment, source, connection, batchSize);
+ return (result, source, connection) -> cursorDocumentToAsyncBatchCursor(result, decoder, comment, source, connection, batchSize);
}
private CommandReadTransformer> commandTransformer() {
- return (result, source, connection) -> cursorDocumentToBatchCursor(result.getDocument("cursor"), decoder, comment, source, connection, batchSize);
+ return (result, source, connection) -> cursorDocumentToBatchCursor(result, decoder, comment, source, connection, batchSize);
}
private CommandOperationHelper.CommandCreator getCommandCreator() {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java
index bacf64601c9..fec689c938f 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java
@@ -16,12 +16,11 @@
package com.mongodb.internal.operation;
-import com.mongodb.connection.ConnectionDescription;
+
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
-import com.mongodb.internal.connection.QueryResult;
import com.mongodb.lang.Nullable;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
@@ -34,13 +33,13 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
-import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
+import static com.mongodb.internal.operation.AsyncOperationHelper.asyncSingleBatchCursorTransformer;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
-import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
+import static com.mongodb.internal.operation.SyncOperationHelper.singleBatchCursorTransformer;
/**
@@ -49,6 +48,9 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class ListDatabasesOperation implements AsyncReadOperation>, ReadOperation> {
+
+ private static final String DATABASES = "databases";
+
private final Decoder decoder;
private boolean retryReads;
@@ -122,28 +124,16 @@ public ListDatabasesOperation comment(@Nullable final BsonValue comment) {
@Override
public BatchCursor execute(final ReadBinding binding) {
return executeRetryableRead(binding, "admin", getCommandCreator(),
- CommandResultDocumentCodec.create(decoder, "databases"), transformer(), retryReads);
+ CommandResultDocumentCodec.create(decoder, DATABASES),
+ singleBatchCursorTransformer(DATABASES), retryReads);
}
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) {
executeRetryableReadAsync(binding, "admin", getCommandCreator(),
- CommandResultDocumentCodec.create(decoder, "databases"), asyncTransformer(),
- retryReads, errorHandlingCallback(callback, LOGGER));
- }
-
- private CommandReadTransformer> transformer() {
- return (result, source, connection) -> new QueryBatchCursor<>(createQueryResult(result, connection.getDescription()), 0, 0, decoder, comment, source);
- }
-
- private CommandReadTransformerAsync> asyncTransformer() {
- return (result, source, connection) -> new AsyncQueryBatchCursor<>(createQueryResult(result, connection.getDescription()), 0, 0, 0, decoder,
- comment, source, connection, result);
- }
-
- private QueryResult createQueryResult(final BsonDocument result, final ConnectionDescription description) {
- return new QueryResult<>(null, BsonDocumentWrapperHelper.toList(result, "databases"), 0,
- description.getServerAddress());
+ CommandResultDocumentCodec.create(decoder, DATABASES),
+ asyncSingleBatchCursorTransformer(DATABASES), retryReads,
+ errorHandlingCallback(callback, LOGGER));
}
private CommandCreator getCommandCreator() {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java
index 62ecdc953bd..e4d0138121d 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java
@@ -22,7 +22,6 @@
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.async.function.AsyncCallbackSupplier;
import com.mongodb.internal.async.function.RetryState;
-import com.mongodb.internal.binding.AsyncConnectionSource;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.lang.Nullable;
@@ -39,11 +38,11 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
-import static com.mongodb.internal.operation.AsyncOperationHelper.createEmptyAsyncBatchCursor;
import static com.mongodb.internal.operation.AsyncOperationHelper.createReadCommandAndExecuteAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.cursorDocumentToAsyncBatchCursor;
import static com.mongodb.internal.operation.AsyncOperationHelper.decorateReadWithRetriesAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection;
+import static com.mongodb.internal.operation.AsyncSingleBatchCursor.createEmptyAsyncSingleBatchCursor;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState;
import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError;
@@ -52,7 +51,7 @@
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.canRetryRead;
-import static com.mongodb.internal.operation.OperationHelper.createEmptyBatchCursor;
+import static com.mongodb.internal.operation.SingleBatchCursor.createEmptySingleBatchCursor;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute;
import static com.mongodb.internal.operation.SyncOperationHelper.cursorDocumentToBatchCursor;
@@ -127,8 +126,8 @@ public BatchCursor execute(final ReadBinding binding) {
return createReadCommandAndExecute(retryState, binding, source, namespace.getDatabaseName(), getCommandCreator(),
createCommandDecoder(), transformer(), connection);
} catch (MongoCommandException e) {
- return rethrowIfNotNamespaceError(e, createEmptyBatchCursor(namespace, decoder,
- source.getServerDescription().getAddress(), batchSize));
+ return rethrowIfNotNamespaceError(e,
+ createEmptySingleBatchCursor(source.getServerDescription().getAddress(), batchSize));
}
})
);
@@ -152,7 +151,8 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
if (t != null && !isNamespaceError(t)) {
releasingCallback.onResult(null, t);
} else {
- releasingCallback.onResult(result != null ? result : emptyAsyncCursor(source), null);
+ releasingCallback.onResult(result != null
+ ? result : createEmptyAsyncSingleBatchCursor(getBatchSize()), null);
}
});
})
@@ -160,9 +160,6 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
asyncRead.get(errorHandlingCallback(callback, LOGGER));
}
- private AsyncBatchCursor emptyAsyncCursor(final AsyncConnectionSource source) {
- return createEmptyAsyncBatchCursor(namespace, source.getServerDescription().getAddress());
- }
private CommandCreator getCommandCreator() {
return (serverDescription, connectionDescription) -> getCommand();
@@ -179,11 +176,11 @@ private BsonDocument getCommand() {
}
private CommandReadTransformer> transformer() {
- return (result, source, connection) -> cursorDocumentToBatchCursor(result.getDocument("cursor"), decoder, comment, source, connection, batchSize);
+ return (result, source, connection) -> cursorDocumentToBatchCursor(result, decoder, comment, source, connection, batchSize);
}
private CommandReadTransformerAsync> asyncTransformer() {
- return (result, source, connection) -> cursorDocumentToAsyncBatchCursor(result.getDocument("cursor"), decoder, comment, source, connection, batchSize);
+ return (result, source, connection) -> cursorDocumentToAsyncBatchCursor(result, decoder, comment, source, connection, batchSize);
}
private Codec createCommandDecoder() {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java
index 4c471a16bd4..74313059099 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java
@@ -34,9 +34,9 @@
import java.util.Collections;
import java.util.concurrent.TimeUnit;
-import static com.mongodb.internal.operation.AsyncOperationHelper.createEmptyAsyncBatchCursor;
+import static com.mongodb.internal.operation.AsyncSingleBatchCursor.createEmptyAsyncSingleBatchCursor;
import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError;
-import static com.mongodb.internal.operation.OperationHelper.createEmptyBatchCursor;
+import static com.mongodb.internal.operation.SingleBatchCursor.createEmptySingleBatchCursor;
/**
* An operation that lists Alas Search indexes with the help of {@value #STAGE_LIST_SEARCH_INDEXES} pipeline stage.
@@ -90,7 +90,7 @@ public BatchCursor execute(final ReadBinding binding) {
if (!isNamespaceError(exception)) {
throw exception;
} else {
- return createEmptyBatchCursor(namespace, decoder, exception.getServerAddress(), cursorBatchSize);
+ return createEmptySingleBatchCursor(exception.getServerAddress(), cursorBatchSize);
}
}
}
@@ -101,9 +101,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
if (exception != null && !isNamespaceError(exception)) {
callback.onResult(null, exception);
} else if (exception != null) {
- MongoCommandException commandException = (MongoCommandException) exception;
- AsyncBatchCursor emptyAsyncBatchCursor = createEmptyAsyncBatchCursor(namespace, commandException.getServerAddress());
- callback.onResult(emptyAsyncBatchCursor, null);
+ callback.onResult(createEmptyAsyncSingleBatchCursor(batchSize == null ? 0 : batchSize), null);
} else {
callback.onResult(cursor, null);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceInlineResultsAsyncCursor.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceInlineResultsAsyncCursor.java
index 1da84755100..ebf331fe47b 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceInlineResultsAsyncCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceInlineResultsAsyncCursor.java
@@ -16,18 +16,21 @@
package com.mongodb.internal.operation;
-import com.mongodb.internal.connection.QueryResult;
+import com.mongodb.internal.async.SingleResultCallback;
+
+import java.util.List;
/**
* Cursor representation of the results of an inline map-reduce operation. This allows users to iterate over the results that were returned
* from the operation, and also provides access to the statistics returned in the results.
*/
-class MapReduceInlineResultsAsyncCursor extends AsyncSingleBatchQueryCursor implements MapReduceAsyncBatchCursor {
+class MapReduceInlineResultsAsyncCursor implements MapReduceAsyncBatchCursor {
+ private final AsyncSingleBatchCursor delegate;
private final MapReduceStatistics statistics;
- MapReduceInlineResultsAsyncCursor(final QueryResult queryResult, final MapReduceStatistics statistics) {
- super(queryResult);
+ MapReduceInlineResultsAsyncCursor(final AsyncSingleBatchCursor delegate, final MapReduceStatistics statistics) {
+ this.delegate = delegate;
this.statistics = statistics;
}
@@ -35,4 +38,29 @@ class MapReduceInlineResultsAsyncCursor extends AsyncSingleBatchQueryCursor> callback) {
+ delegate.next(callback);
+ }
+
+ @Override
+ public void setBatchSize(final int batchSize) {
+ delegate.setBatchSize(batchSize);
+ }
+
+ @Override
+ public int getBatchSize() {
+ return delegate.getBatchSize();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return delegate.isClosed();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceInlineResultsCursor.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceInlineResultsCursor.java
index caa2f7fd355..564eac4a8f0 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceInlineResultsCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceInlineResultsCursor.java
@@ -16,20 +16,21 @@
package com.mongodb.internal.operation;
-import com.mongodb.internal.binding.ConnectionSource;
-import com.mongodb.internal.connection.QueryResult;
-import org.bson.codecs.Decoder;
+import com.mongodb.ServerAddress;
+import com.mongodb.ServerCursor;
+
+import java.util.List;
/**
* Cursor representation of the results of an inline map-reduce operation. This allows users to iterate over the results that were returned
* from the operation, and also provides access to the statistics returned in the results.
*/
-class MapReduceInlineResultsCursor extends QueryBatchCursor implements MapReduceBatchCursor {
+class MapReduceInlineResultsCursor implements MapReduceBatchCursor {
+ private final BatchCursor delegate;
private final MapReduceStatistics statistics;
- MapReduceInlineResultsCursor(final QueryResult queryResult, final Decoder decoder, final ConnectionSource connectionSource,
- final MapReduceStatistics statistics) {
- super(queryResult, 0, 0, decoder, null, connectionSource);
+ MapReduceInlineResultsCursor(final BatchCursor delegate, final MapReduceStatistics statistics) {
+ this.delegate = delegate;
this.statistics = statistics;
}
@@ -37,4 +38,49 @@ class MapReduceInlineResultsCursor extends QueryBatchCursor implements Map
public MapReduceStatistics getStatistics() {
return statistics;
}
+
+ @Override
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public List next() {
+ return delegate.next();
+ }
+
+ @Override
+ public int available() {
+ return delegate.available();
+ }
+
+ @Override
+ public void setBatchSize(final int batchSize) {
+ delegate.setBatchSize(batchSize);
+ }
+
+ @Override
+ public int getBatchSize() {
+ return delegate.getBatchSize();
+ }
+
+ @Override
+ public List tryNext() {
+ return delegate.tryNext();
+ }
+
+ @Override
+ public ServerCursor getServerCursor() {
+ return delegate.getServerCursor();
+ }
+
+ @Override
+ public ServerAddress getServerAddress() {
+ return delegate.getServerAddress();
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java
index 131591dd6e2..7205a09dad6 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java
@@ -19,12 +19,10 @@
import com.mongodb.ExplainVerbosity;
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
-import com.mongodb.connection.ConnectionDescription;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.connection.NoOpSessionContext;
-import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
@@ -215,12 +213,16 @@ private CommandReadOperation createExplainableOperation(final Expl
}
private CommandReadTransformer> transformer() {
- return (result, source, connection) -> new MapReduceInlineResultsCursor<>(createQueryResult(result, connection.getDescription()), decoder, source,
- MapReduceHelper.createStatistics(result));
+ return (result, source, connection) ->
+ new MapReduceInlineResultsCursor<>(
+ new SingleBatchCursor<>(BsonDocumentWrapperHelper.toList(result, "results"), 0,
+ connection.getDescription().getServerAddress()),
+ MapReduceHelper.createStatistics(result));
}
private CommandReadTransformerAsync> asyncTransformer() {
- return (result, source, connection) -> new MapReduceInlineResultsAsyncCursor<>(createQueryResult(result, connection.getDescription()),
+ return (result, source, connection) -> new MapReduceInlineResultsAsyncCursor<>(
+ new AsyncSingleBatchCursor<>(BsonDocumentWrapperHelper.toList(result, "results"), 0),
MapReduceHelper.createStatistics(result));
}
@@ -248,9 +250,4 @@ private BsonDocument getCommand(final SessionContext sessionContext, final int m
}
return commandDocument;
}
-
- private QueryResult createQueryResult(final BsonDocument result, final ConnectionDescription description) {
- return new QueryResult<>(namespace, BsonDocumentWrapperHelper.toList(result, "results"), 0,
- description.getServerAddress());
- }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java
index 387bb2f5da6..bfa1adbd97e 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java
@@ -17,8 +17,6 @@
package com.mongodb.internal.operation;
import com.mongodb.MongoClientException;
-import com.mongodb.MongoNamespace;
-import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
@@ -30,18 +28,14 @@
import com.mongodb.internal.bulk.DeleteRequest;
import com.mongodb.internal.bulk.UpdateRequest;
import com.mongodb.internal.bulk.WriteRequest;
-import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.NonNull;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
-import org.bson.BsonInt64;
-import org.bson.codecs.Decoder;
import org.bson.conversions.Bson;
-import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -200,26 +194,6 @@ static boolean canRetryRead(final ServerDescription serverDescription, final Ses
return true;
}
- static QueryBatchCursor createEmptyBatchCursor(final MongoNamespace namespace, final Decoder decoder,
- final ServerAddress serverAddress, final int batchSize) {
- return new QueryBatchCursor<>(new QueryResult<>(namespace, Collections.emptyList(), 0L,
- serverAddress),
- 0, batchSize, decoder);
- }
-
- static QueryResult cursorDocumentToQueryResult(final BsonDocument cursorDocument, final ServerAddress serverAddress) {
- return cursorDocumentToQueryResult(cursorDocument, serverAddress, "firstBatch");
- }
-
- static QueryResult cursorDocumentToQueryResult(final BsonDocument cursorDocument, final ServerAddress serverAddress,
- final String fieldNameContainingBatch) {
- long cursorId = ((BsonInt64) cursorDocument.get("id")).getValue();
- MongoNamespace queryResultNamespace = new MongoNamespace(cursorDocument.getString("ns").getValue());
- return new QueryResult<>(queryResultNamespace, BsonDocumentWrapperHelper.toList(cursorDocument, fieldNameContainingBatch),
- cursorId, serverAddress);
- }
-
-
/**
* This internal exception is used to
*
diff --git a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java
deleted file mode 100644
index 587237fcaf8..00000000000
--- a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/*
- * Copyright 2008-present MongoDB, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.mongodb.internal.operation;
-
-import com.mongodb.MongoCommandException;
-import com.mongodb.MongoException;
-import com.mongodb.MongoNamespace;
-import com.mongodb.MongoSocketException;
-import com.mongodb.ReadPreference;
-import com.mongodb.ServerAddress;
-import com.mongodb.ServerCursor;
-import com.mongodb.annotations.ThreadSafe;
-import com.mongodb.connection.ConnectionDescription;
-import com.mongodb.connection.ServerType;
-import com.mongodb.internal.binding.ConnectionSource;
-import com.mongodb.internal.connection.Connection;
-import com.mongodb.internal.connection.QueryResult;
-import com.mongodb.internal.diagnostics.logging.Logger;
-import com.mongodb.internal.diagnostics.logging.Loggers;
-import com.mongodb.internal.validator.NoOpFieldNameValidator;
-import com.mongodb.lang.Nullable;
-import org.bson.BsonArray;
-import org.bson.BsonDocument;
-import org.bson.BsonInt32;
-import org.bson.BsonInt64;
-import org.bson.BsonString;
-import org.bson.BsonTimestamp;
-import org.bson.BsonValue;
-import org.bson.FieldNameValidator;
-import org.bson.codecs.BsonDocumentCodec;
-import org.bson.codecs.Decoder;
-
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.StampedLock;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import static com.mongodb.assertions.Assertions.assertNotNull;
-import static com.mongodb.assertions.Assertions.assertNull;
-import static com.mongodb.assertions.Assertions.assertTrue;
-import static com.mongodb.assertions.Assertions.fail;
-import static com.mongodb.assertions.Assertions.isTrueArgument;
-import static com.mongodb.assertions.Assertions.notNull;
-import static com.mongodb.internal.Locks.withLock;
-import static com.mongodb.internal.operation.CursorHelper.getNumberToReturn;
-import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
-import static com.mongodb.internal.operation.SyncOperationHelper.getMoreCursorDocumentToQueryResult;
-import static com.mongodb.internal.operation.QueryHelper.translateCommandException;
-import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour;
-import static java.lang.String.format;
-import static java.util.Collections.singletonList;
-
-class QueryBatchCursor implements AggregateResponseBatchCursor {
- private static final Logger LOGGER = Loggers.getLogger("operation");
- private static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
- private static final String CURSOR = "cursor";
- private static final String POST_BATCH_RESUME_TOKEN = "postBatchResumeToken";
- private static final String OPERATION_TIME = "operationTime";
- private static final String MESSAGE_IF_CLOSED_AS_CURSOR = "Cursor has been closed";
- private static final String MESSAGE_IF_CLOSED_AS_ITERATOR = "Iterator has been closed";
-
- private final MongoNamespace namespace;
- private final ServerAddress serverAddress;
- private final int limit;
- private final Decoder decoder;
- private final long maxTimeMS;
- private int batchSize;
- private final BsonValue comment;
- private List nextBatch;
- private int count;
- private BsonDocument postBatchResumeToken;
- private BsonTimestamp operationTime;
- private final boolean firstBatchEmpty;
- private int maxWireVersion = 0;
- private final ResourceManager resourceManager;
-
- QueryBatchCursor(final QueryResult firstQueryResult, final int limit, final int batchSize, final Decoder decoder) {
- this(firstQueryResult, limit, batchSize, decoder, null, null);
- }
-
- QueryBatchCursor(final QueryResult firstQueryResult, final int limit, final int batchSize, final Decoder decoder,
- @Nullable final BsonValue comment, @Nullable final ConnectionSource connectionSource) {
- this(firstQueryResult, limit, batchSize, 0, decoder, comment, connectionSource, null, null);
- }
-
- QueryBatchCursor(final QueryResult firstQueryResult, final int limit, final int batchSize, final long maxTimeMS,
- final Decoder decoder, @Nullable final BsonValue comment, @Nullable final ConnectionSource connectionSource,
- @Nullable final Connection connection) {
- this(firstQueryResult, limit, batchSize, maxTimeMS, decoder, comment, connectionSource, connection, null);
- }
-
- QueryBatchCursor(final QueryResult firstQueryResult, final int limit, final int batchSize, final long maxTimeMS,
- final Decoder decoder, @Nullable final BsonValue comment, @Nullable final ConnectionSource connectionSource,
- @Nullable final Connection connection, @Nullable final BsonDocument result) {
- isTrueArgument("maxTimeMS >= 0", maxTimeMS >= 0);
- this.maxTimeMS = maxTimeMS;
- this.namespace = firstQueryResult.getNamespace();
- this.serverAddress = firstQueryResult.getAddress();
- this.limit = limit;
- this.comment = comment;
- this.batchSize = batchSize;
- this.decoder = notNull("decoder", decoder);
- if (result != null) {
- this.operationTime = result.getTimestamp(OPERATION_TIME, null);
- this.postBatchResumeToken = getPostBatchResumeTokenFromResponse(result);
- }
- ServerCursor serverCursor = initFromQueryResult(firstQueryResult);
- if (serverCursor != null) {
- notNull("connectionSource", connectionSource);
- }
- firstBatchEmpty = firstQueryResult.getResults().isEmpty();
- Connection connectionToPin = null;
- boolean releaseServerAndResources = false;
- if (connection != null) {
- this.maxWireVersion = connection.getDescription().getMaxWireVersion();
- if (limitReached()) {
- releaseServerAndResources = true;
- } else {
- assertNotNull(connectionSource);
- if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) {
- connectionToPin = connection;
- }
- }
- }
- resourceManager = new ResourceManager(connectionSource, connectionToPin, serverCursor);
- if (releaseServerAndResources) {
- resourceManager.releaseServerAndClientResources(assertNotNull(connection));
- }
- }
-
- @Override
- public boolean hasNext() {
- return assertNotNull(resourceManager.execute(MESSAGE_IF_CLOSED_AS_CURSOR, this::doHasNext));
- }
-
- private boolean doHasNext() {
- if (nextBatch != null) {
- return true;
- }
-
- if (limitReached()) {
- return false;
- }
-
- while (resourceManager.serverCursor() != null) {
- getMore();
- if (!resourceManager.operable()) {
- throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR);
- }
- if (nextBatch != null) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- public List next() {
- return assertNotNull(resourceManager.execute(MESSAGE_IF_CLOSED_AS_ITERATOR, this::doNext));
- }
-
- @Override
- public int available() {
- return !resourceManager.operable() || nextBatch == null ? 0 : nextBatch.size();
- }
-
- private List doNext() {
- if (!doHasNext()) {
- throw new NoSuchElementException();
- }
-
- List retVal = nextBatch;
- nextBatch = null;
- return retVal;
- }
-
- @Override
- public void setBatchSize(final int batchSize) {
- this.batchSize = batchSize;
- }
-
- @Override
- public int getBatchSize() {
- return batchSize;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Not implemented yet!");
- }
-
- @Override
- public void close() {
- resourceManager.close();
- }
-
- @Nullable
- @Override
- public List tryNext() {
- return resourceManager.execute(MESSAGE_IF_CLOSED_AS_CURSOR, () -> {
- if (!tryHasNext()) {
- return null;
- }
- return doNext();
- });
- }
-
- private boolean tryHasNext() {
- if (nextBatch != null) {
- return true;
- }
-
- if (limitReached()) {
- return false;
- }
-
- if (resourceManager.serverCursor() != null) {
- getMore();
- }
-
- return nextBatch != null;
- }
-
- @Override
- @Nullable
- public ServerCursor getServerCursor() {
- if (!resourceManager.operable()) {
- throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
- }
-
- return resourceManager.serverCursor();
- }
-
- @Override
- public ServerAddress getServerAddress() {
- if (!resourceManager.operable()) {
- throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
- }
-
- return serverAddress;
- }
-
- @Override
- public BsonDocument getPostBatchResumeToken() {
- return postBatchResumeToken;
- }
-
- @Override
- public BsonTimestamp getOperationTime() {
- return operationTime;
- }
-
- @Override
- public boolean isFirstBatchEmpty() {
- return firstBatchEmpty;
- }
-
- @Override
- public int getMaxWireVersion() {
- return maxWireVersion;
- }
-
- private void getMore() {
- ServerCursor serverCursor = assertNotNull(resourceManager.serverCursor());
- resourceManager.executeWithConnection(connection -> {
- ServerCursor nextServerCursor;
- try {
- nextServerCursor = initFromCommandResult(connection.command(namespace.getDatabaseName(),
- asGetMoreCommandDocument(serverCursor.getId(), connection.getDescription()),
- NO_OP_FIELD_NAME_VALIDATOR,
- ReadPreference.primary(),
- CommandResultDocumentCodec.create(decoder, "nextBatch"),
- assertNotNull(resourceManager.connectionSource)));
- } catch (MongoCommandException e) {
- throw translateCommandException(e, serverCursor);
- }
- resourceManager.setServerCursor(nextServerCursor);
- if (limitReached()) {
- resourceManager.releaseServerAndClientResources(connection);
- }
- });
- }
-
- private BsonDocument asGetMoreCommandDocument(final long cursorId, final ConnectionDescription connectionDescription) {
- BsonDocument document = new BsonDocument("getMore", new BsonInt64(cursorId))
- .append("collection", new BsonString(namespace.getCollectionName()));
-
- int batchSizeForGetMoreCommand = Math.abs(getNumberToReturn(limit, this.batchSize, count));
- if (batchSizeForGetMoreCommand != 0) {
- document.append("batchSize", new BsonInt32(batchSizeForGetMoreCommand));
- }
- if (maxTimeMS != 0) {
- document.append("maxTimeMS", new BsonInt64(maxTimeMS));
- }
- if (serverIsAtLeastVersionFourDotFour(connectionDescription)) {
- putIfNotNull(document, "comment", comment);
- }
- return document;
- }
-
- @Nullable
- private ServerCursor initFromQueryResult(final QueryResult queryResult) {
- nextBatch = queryResult.getResults().isEmpty() ? null : queryResult.getResults();
- count += queryResult.getResults().size();
- LOGGER.debug(format("Received batch of %d documents with cursorId %d from server %s", queryResult.getResults().size(),
- queryResult.getCursorId(), queryResult.getAddress()));
- return queryResult.getCursor();
- }
-
- @Nullable
- private ServerCursor initFromCommandResult(final BsonDocument getMoreCommandResultDocument) {
- QueryResult queryResult = getMoreCursorDocumentToQueryResult(getMoreCommandResultDocument.getDocument(CURSOR), serverAddress);
- postBatchResumeToken = getPostBatchResumeTokenFromResponse(getMoreCommandResultDocument);
- operationTime = getMoreCommandResultDocument.getTimestamp(OPERATION_TIME, null);
- return initFromQueryResult(queryResult);
- }
-
- private boolean limitReached() {
- return Math.abs(limit) != 0 && count >= Math.abs(limit);
- }
-
- @Nullable
- private BsonDocument getPostBatchResumeTokenFromResponse(final BsonDocument result) {
- BsonDocument cursor = result.getDocument(CURSOR, null);
- if (cursor != null) {
- return cursor.getDocument(POST_BATCH_RESUME_TOKEN, null);
- }
- return null;
- }
-
- /**
- * This class maintains all resources that must be released in {@link QueryBatchCursor#close()}.
- * It also implements a {@linkplain #doClose() deferred close action} such that it is totally ordered with other operations of
- * {@link QueryBatchCursor} (methods {@link #tryStartOperation()}/{@link #endOperation()} must be used properly to enforce the order)
- * despite the method {@link QueryBatchCursor#close()} being called concurrently with those operations.
- * This total order induces the happens-before order.
- *
- * The deferred close action does not violate externally observable idempotence of {@link QueryBatchCursor#close()},
- * because {@link QueryBatchCursor#close()} is allowed to release resources "eventually".
- *
- * Only methods explicitly documented as thread-safe are thread-safe,
- * others are not and rely on the total order mentioned above.
- */
- @ThreadSafe
- private final class ResourceManager {
- private final Lock lock;
- private volatile State state;
- @Nullable
- private volatile ConnectionSource connectionSource;
- @Nullable
- private volatile Connection pinnedConnection;
- @Nullable
- private volatile ServerCursor serverCursor;
- private volatile boolean skipReleasingServerResourcesOnClose;
-
- ResourceManager(@Nullable final ConnectionSource connectionSource,
- @Nullable final Connection connectionToPin, @Nullable final ServerCursor serverCursor) {
- lock = new StampedLock().asWriteLock();
- state = State.IDLE;
- if (serverCursor != null) {
- this.connectionSource = (assertNotNull(connectionSource)).retain();
- if (connectionToPin != null) {
- this.pinnedConnection = connectionToPin.retain();
- connectionToPin.markAsPinned(Connection.PinningMode.CURSOR);
- }
- }
- skipReleasingServerResourcesOnClose = false;
- this.serverCursor = serverCursor;
- }
-
- /**
- * Thread-safe.
- */
- boolean operable() {
- return state.operable();
- }
-
- /**
- * Thread-safe.
- * Executes {@code operation} within the {@link #tryStartOperation()}/{@link #endOperation()} bounds.
- *
- * @throws IllegalStateException If {@linkplain QueryBatchCursor#close() closed}.
- */
- @Nullable
- R execute(final String exceptionMessageIfClosed, final Supplier