Skip to content

Refactored Command Batch Cursors. #1198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .evergreen/run-load-balancer-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ echo $second
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
-Dorg.mongodb.test.multi.mongos.uri=${MULTI_MONGOS_LB_URI} \
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-core:test \
--tests QueryBatchCursorFunctionalSpecification
--tests CommandBatchCursorSpecification
third=$?
echo $third

Expand Down
6 changes: 0 additions & 6 deletions config/codenarc/codenarc.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@
<exclude name="ComparisonWithSelf"/>
</ruleset-ref>
<ruleset-ref path='rulesets/braces.xml'/>
<ruleset-ref path='rulesets/concurrency.xml'>
<rule-config name='BusyWait'>
<property name='doNotApplyToFileNames' value='AsyncQueryBatchCursorFunctionalSpecification.groovy'/>
</rule-config>

</ruleset-ref>
<ruleset-ref path='rulesets/convention.xml'>
<rule-config name='NoDef'>
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.async;

import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;

Expand All @@ -25,8 +26,10 @@
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface AsyncAggregateResponseBatchCursor<T> extends AsyncBatchCursor<T> {
@Nullable
BsonDocument getPostBatchResumeToken();

@Nullable
BsonTimestamp getOperationTime();

boolean isFirstBatchEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.internal.async;

import com.mongodb.internal.operation.BatchCursor;

import java.io.Closeable;
import java.util.List;

Expand All @@ -28,9 +30,9 @@
*/
public interface AsyncBatchCursor<T> extends Closeable {
/**
* Returns the next batch of results. A tailable cursor will block until another batch exists. After the last batch, the next call
* to this method will execute the callback with a null result to indicate that there are no more batches available and the cursor
* has been closed.
* Returns the next batch of results. A tailable cursor will block until another batch exists.
* Unlike the {@link BatchCursor} this method will automatically mark the cursor as closed when there are no more expected results.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cursor no longer calls callback.onResult(null, null). As this class is internal and consumed internally there is no public API breakage or behavour change.

* Care should be taken to check {@link #isClosed()} between calls.
*
* @param callback callback to receive the next batch of results
* @throws java.util.NoSuchElementException if no next batch exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@

import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
Expand All @@ -40,15 +38,13 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
Expand Down Expand Up @@ -239,25 +235,16 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe
return commandDocument;
}

private QueryResult<T> createQueryResult(final BsonDocument result, final ConnectionDescription description) {
assertNotNull(result);
return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress());
}

private CommandReadTransformer<BsonDocument, QueryBatchCursor<T>> transformer() {
return (result, source, connection) -> {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
source, connection, result);
};
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
return (result, source, connection) ->
new CommandBatchCursor<>(connection.getDescription().getServerAddress(), result, 0, batchSize != null ? batchSize : 0,
maxAwaitTimeMS, decoder, comment, source, connection);
}

private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
return (result, source, connection) -> {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new AsyncQueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection, result);
};
return (result, source, connection) ->
new AsyncCommandBatchCursor<>(connection.getDescription().getServerAddress(), result, 0, batchSize != null ? batchSize : 0,
maxAwaitTimeMS, decoder, comment, source, connection);
}

interface AggregateTarget {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.operation;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;

Expand All @@ -27,8 +28,10 @@
*/
@NotThreadSafe
public interface AggregateResponseBatchCursor<T> extends BatchCursor<T> {
@Nullable
BsonDocument getPostBatchResumeToken();

@Nullable
BsonTimestamp getOperationTime();

boolean isFirstBatchEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
final int maxWireVersion) {
this.changeStreamOperation = changeStreamOperation;
this.wrapped = new AtomicReference<>(assertNotNull(wrapped));
this.binding = binding;
binding.retain();
this.binding = binding.retain();
this.resumeToken = resumeToken;
this.maxWireVersion = maxWireVersion;
isClosed = new AtomicBoolean();
Expand Down Expand Up @@ -164,8 +163,8 @@ public int getMaxWireVersion() {
return maxWireVersion;
}

private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
BsonDocument resumeToken = queryBatchCursor.getPostBatchResumeToken();
private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<RawBsonDocument> commandBatchCursor) {
BsonDocument resumeToken = commandBatchCursor.getPostBatchResumeToken();
if (resumeToken != null) {
this.resumeToken = resumeToken;
}
Expand Down
Loading