Skip to content

Commit 831bef1

Browse files
committed
Refactored Command Batch Cursors.
Previously had a QueryResult / QueryBatchCursor abstraction. This abstraction is no longer required as only commands are used to create cursors. Two new classes have been added: 1. SingleBatchCursor Used when commands return a single list of results but not an actual cursor. 2. CommandBatchCursor Used for commands that return a cursor and contain all the logic to manage resources and issue get more calls. The construction and resource management has been simplified by reducing the number of constructors used when creating the cursor. This will simplify future refactorings. The asynchronous cursor abstractions have been refactored to more closely follow their synchronous counterparts. Reducing the cognative costs when working on both cursor types. JAVA-5159
1 parent 4909d09 commit 831bef1

File tree

41 files changed

+2538
-1528
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2538
-1528
lines changed

.evergreen/run-load-balancer-tests.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ echo $second
7979
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
8080
-Dorg.mongodb.test.multi.mongos.uri=${MULTI_MONGOS_LB_URI} \
8181
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-core:test \
82-
--tests QueryBatchCursorFunctionalSpecification
82+
--tests CommandBatchCursorSpecification
8383
third=$?
8484
echo $third
8585

config/codenarc/codenarc.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,6 @@
3434
<exclude name="ComparisonWithSelf"/>
3535
</ruleset-ref>
3636
<ruleset-ref path='rulesets/braces.xml'/>
37-
<ruleset-ref path='rulesets/concurrency.xml'>
38-
<rule-config name='BusyWait'>
39-
<property name='doNotApplyToFileNames' value='AsyncQueryBatchCursorFunctionalSpecification.groovy'/>
40-
</rule-config>
41-
42-
</ruleset-ref>
4337
<ruleset-ref path='rulesets/convention.xml'>
4438
<rule-config name='NoDef'>
4539
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>

driver-core/src/main/com/mongodb/internal/async/AsyncAggregateResponseBatchCursor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.async;
1818

19+
import com.mongodb.lang.Nullable;
1920
import org.bson.BsonDocument;
2021
import org.bson.BsonTimestamp;
2122

@@ -25,8 +26,10 @@
2526
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2627
*/
2728
public interface AsyncAggregateResponseBatchCursor<T> extends AsyncBatchCursor<T> {
29+
@Nullable
2830
BsonDocument getPostBatchResumeToken();
2931

32+
@Nullable
3033
BsonTimestamp getOperationTime();
3134

3235
boolean isFirstBatchEmpty();

driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.mongodb.internal.async;
1818

19+
import com.mongodb.internal.operation.BatchCursor;
20+
1921
import java.io.Closeable;
2022
import java.util.List;
2123

@@ -28,9 +30,9 @@
2830
*/
2931
public interface AsyncBatchCursor<T> extends Closeable {
3032
/**
31-
* Returns the next batch of results. A tailable cursor will block until another batch exists. After the last batch, the next call
32-
* to this method will execute the callback with a null result to indicate that there are no more batches available and the cursor
33-
* has been closed.
33+
* Returns the next batch of results. A tailable cursor will block until another batch exists.
34+
* Unlike the {@link BatchCursor} this method will automatically mark the cursor as closed when there are no more expected results.
35+
* Care should be taken to check {@link #isClosed()} between calls.
3436
*
3537
* @param callback callback to receive the next batch of results
3638
* @throws java.util.NoSuchElementException if no next batch exists

driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818

1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.client.model.Collation;
21-
import com.mongodb.connection.ConnectionDescription;
2221
import com.mongodb.internal.async.AsyncBatchCursor;
2322
import com.mongodb.internal.async.SingleResultCallback;
2423
import com.mongodb.internal.binding.AsyncReadBinding;
2524
import com.mongodb.internal.binding.ReadBinding;
2625
import com.mongodb.internal.client.model.AggregationLevel;
27-
import com.mongodb.internal.connection.QueryResult;
2826
import com.mongodb.internal.session.SessionContext;
2927
import com.mongodb.lang.Nullable;
3028
import org.bson.BsonArray;
@@ -40,15 +38,13 @@
4038
import java.util.List;
4139
import java.util.concurrent.TimeUnit;
4240

43-
import static com.mongodb.assertions.Assertions.assertNotNull;
4441
import static com.mongodb.assertions.Assertions.isTrueArgument;
4542
import static com.mongodb.assertions.Assertions.notNull;
4643
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
4744
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
4845
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
4946
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
5047
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
51-
import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
5248
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
5349
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
5450
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
@@ -239,25 +235,16 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe
239235
return commandDocument;
240236
}
241237

242-
private QueryResult<T> createQueryResult(final BsonDocument result, final ConnectionDescription description) {
243-
assertNotNull(result);
244-
return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress());
245-
}
246-
247-
private CommandReadTransformer<BsonDocument, QueryBatchCursor<T>> transformer() {
248-
return (result, source, connection) -> {
249-
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
250-
return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
251-
source, connection, result);
252-
};
238+
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
239+
return (result, source, connection) ->
240+
new CommandBatchCursor<>(connection.getDescription().getServerAddress(), result, 0, batchSize != null ? batchSize : 0,
241+
maxAwaitTimeMS, decoder, comment, source, connection);
253242
}
254243

255244
private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
256-
return (result, source, connection) -> {
257-
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
258-
return new AsyncQueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
259-
comment, source, connection, result);
260-
};
245+
return (result, source, connection) ->
246+
new AsyncCommandBatchCursor<>(connection.getDescription().getServerAddress(), result, 0, batchSize != null ? batchSize : 0,
247+
maxAwaitTimeMS, decoder, comment, source, connection);
261248
}
262249

263250
interface AggregateTarget {

driver-core/src/main/com/mongodb/internal/operation/AggregateResponseBatchCursor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.annotations.NotThreadSafe;
20+
import com.mongodb.lang.Nullable;
2021
import org.bson.BsonDocument;
2122
import org.bson.BsonTimestamp;
2223

@@ -27,8 +28,10 @@
2728
*/
2829
@NotThreadSafe
2930
public interface AggregateResponseBatchCursor<T> extends BatchCursor<T> {
31+
@Nullable
3032
BsonDocument getPostBatchResumeToken();
3133

34+
@Nullable
3235
BsonTimestamp getOperationTime();
3336

3437
boolean isFirstBatchEmpty();

driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
6060
final int maxWireVersion) {
6161
this.changeStreamOperation = changeStreamOperation;
6262
this.wrapped = new AtomicReference<>(assertNotNull(wrapped));
63-
this.binding = binding;
64-
binding.retain();
63+
this.binding = binding.retain();
6564
this.resumeToken = resumeToken;
6665
this.maxWireVersion = maxWireVersion;
6766
isClosed = new AtomicBoolean();
@@ -164,8 +163,8 @@ public int getMaxWireVersion() {
164163
return maxWireVersion;
165164
}
166165

167-
private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
168-
BsonDocument resumeToken = queryBatchCursor.getPostBatchResumeToken();
166+
private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<RawBsonDocument> commandBatchCursor) {
167+
BsonDocument resumeToken = commandBatchCursor.getPostBatchResumeToken();
169168
if (resumeToken != null) {
170169
this.resumeToken = resumeToken;
171170
}

0 commit comments

Comments
 (0)