Skip to content

Commit 540c612

Browse files
authored
BatchCursor refactorings (#1246)
- Added SingleBatchCursor - QueryResult and QueryBatchCursor renaming - Renamed and moved QueryResult to CommandCursorResult - Renamed QueryBatchCursor to CommandBatchCursor - Renamed AsyncQueryBatchCursor to AsyncCommandBatchCursor - Unified Async & Sync CommandBatchCursor testing - Added a CursorResourceManager for both Async & Sync JAVA-5159
1 parent 275dbc0 commit 540c612

File tree

59 files changed

+3502
-3151
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+3502
-3151
lines changed

.evergreen/run-load-balancer-tests.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ echo $second
7979
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
8080
-Dorg.mongodb.test.multi.mongos.uri=${MULTI_MONGOS_LB_URI} \
8181
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-core:test \
82-
--tests QueryBatchCursorFunctionalSpecification
82+
--tests CommandBatchCursorFunctionalTest \
83+
--tests AsyncCommandBatchCursorFunctionalTest
8384
third=$?
8485
echo $third
8586

config/codenarc/codenarc.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,6 @@
3434
<exclude name="ComparisonWithSelf"/>
3535
</ruleset-ref>
3636
<ruleset-ref path='rulesets/braces.xml'/>
37-
<ruleset-ref path='rulesets/concurrency.xml'>
38-
<rule-config name='BusyWait'>
39-
<property name='doNotApplyToFileNames' value='AsyncQueryBatchCursorFunctionalSpecification.groovy'/>
40-
</rule-config>
41-
42-
</ruleset-ref>
4337
<ruleset-ref path='rulesets/convention.xml'>
4438
<rule-config name='NoDef'>
4539
<property name='doNotApplyToFilesMatching' value='.*Specification.groovy'/>

driver-core/src/main/com/mongodb/assertions/Assertions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.mongodb.lang.Nullable;
2222

2323
import java.util.Collection;
24+
import java.util.function.Supplier;
2425

2526
/**
2627
* <p>Design by contract assertions.</p> <p>This class is not part of the public API and may be removed or changed at any time.</p>
@@ -226,6 +227,19 @@ public static AssertionError fail(final String msg) throws AssertionError {
226227
throw new AssertionError(assertNotNull(msg));
227228
}
228229

230+
/**
231+
* @param supplier the supplier to check
232+
* @return {@code supplier.get()}
233+
* @throws AssertionError If {@code supplier.get()} throws an exception
234+
*/
235+
public static <T> T doesNotThrow(final Supplier<T> supplier) throws AssertionError {
236+
try {
237+
return supplier.get();
238+
} catch (Exception e) {
239+
throw new AssertionError(e.getMessage(), e);
240+
}
241+
}
242+
229243
private Assertions() {
230244
}
231245
}

driver-core/src/main/com/mongodb/internal/async/AsyncAggregateResponseBatchCursor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.async;
1818

19+
import com.mongodb.lang.Nullable;
1920
import org.bson.BsonDocument;
2021
import org.bson.BsonTimestamp;
2122

@@ -25,8 +26,10 @@
2526
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2627
*/
2728
public interface AsyncAggregateResponseBatchCursor<T> extends AsyncBatchCursor<T> {
29+
@Nullable
2830
BsonDocument getPostBatchResumeToken();
2931

32+
@Nullable
3033
BsonTimestamp getOperationTime();
3134

3235
boolean isFirstBatchEmpty();

driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.mongodb.internal.async;
1818

19+
import com.mongodb.internal.operation.BatchCursor;
20+
1921
import java.io.Closeable;
2022
import java.util.List;
2123

@@ -28,9 +30,9 @@
2830
*/
2931
public interface AsyncBatchCursor<T> extends Closeable {
3032
/**
31-
* Returns the next batch of results. A tailable cursor will block until another batch exists. After the last batch, the next call
32-
* to this method will execute the callback with a null result to indicate that there are no more batches available and the cursor
33-
* has been closed.
33+
* Returns the next batch of results. A tailable cursor will block until another batch exists.
34+
* Unlike the {@link BatchCursor} this method will automatically mark the cursor as closed when there are no more expected results.
35+
* Care should be taken to check {@link #isClosed()} between calls.
3436
*
3537
* @param callback callback to receive the next batch of results
3638
* @throws java.util.NoSuchElementException if no next batch exists

driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@
1818

1919
import com.mongodb.MongoNamespace;
2020
import com.mongodb.client.model.Collation;
21-
import com.mongodb.connection.ConnectionDescription;
2221
import com.mongodb.internal.async.AsyncBatchCursor;
2322
import com.mongodb.internal.async.SingleResultCallback;
2423
import com.mongodb.internal.binding.AsyncReadBinding;
2524
import com.mongodb.internal.binding.ReadBinding;
2625
import com.mongodb.internal.client.model.AggregationLevel;
27-
import com.mongodb.internal.connection.QueryResult;
2826
import com.mongodb.internal.session.SessionContext;
2927
import com.mongodb.lang.Nullable;
3028
import org.bson.BsonArray;
@@ -40,15 +38,13 @@
4038
import java.util.List;
4139
import java.util.concurrent.TimeUnit;
4240

43-
import static com.mongodb.assertions.Assertions.assertNotNull;
4441
import static com.mongodb.assertions.Assertions.isTrueArgument;
4542
import static com.mongodb.assertions.Assertions.notNull;
4643
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
4744
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
4845
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
4946
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
5047
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
51-
import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
5248
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
5349
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
5450
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
@@ -239,25 +235,16 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe
239235
return commandDocument;
240236
}
241237

242-
private QueryResult<T> createQueryResult(final BsonDocument result, final ConnectionDescription description) {
243-
assertNotNull(result);
244-
return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress());
245-
}
246-
247-
private CommandReadTransformer<BsonDocument, QueryBatchCursor<T>> transformer() {
248-
return (result, source, connection) -> {
249-
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
250-
return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
251-
source, connection, result);
252-
};
238+
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
239+
return (result, source, connection) ->
240+
new CommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
241+
comment, source, connection);
253242
}
254243

255244
private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
256-
return (result, source, connection) -> {
257-
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
258-
return new AsyncQueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
259-
comment, source, connection, result);
260-
};
245+
return (result, source, connection) ->
246+
new AsyncCommandBatchCursor<>(result, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
247+
comment, source, connection);
261248
}
262249

263250
interface AggregateTarget {

driver-core/src/main/com/mongodb/internal/operation/AggregateResponseBatchCursor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.annotations.NotThreadSafe;
20+
import com.mongodb.lang.Nullable;
2021
import org.bson.BsonDocument;
2122
import org.bson.BsonTimestamp;
2223

@@ -27,8 +28,10 @@
2728
*/
2829
@NotThreadSafe
2930
public interface AggregateResponseBatchCursor<T> extends BatchCursor<T> {
31+
@Nullable
3032
BsonDocument getPostBatchResumeToken();
3133

34+
@Nullable
3235
BsonTimestamp getOperationTime();
3336

3437
boolean isFirstBatchEmpty();

driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.MongoException;
2020
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
21+
import com.mongodb.internal.async.AsyncBatchCursor;
2122
import com.mongodb.internal.async.SingleResultCallback;
2223
import com.mongodb.internal.binding.AsyncReadBinding;
2324
import com.mongodb.lang.NonNull;
@@ -50,11 +51,11 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
5051
* {@code wrapped} containing {@code null} and {@link #isClosed} being {@code false}.
5152
* This represents a situation in which the wrapped object was closed by {@code this} but {@code this} remained open.
5253
*/
53-
private final AtomicReference<AsyncAggregateResponseBatchCursor<RawBsonDocument>> wrapped;
54+
private final AtomicReference<AsyncCommandBatchCursor<RawBsonDocument>> wrapped;
5455
private final AtomicBoolean isClosed;
5556

5657
AsyncChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
57-
final AsyncAggregateResponseBatchCursor<RawBsonDocument> wrapped,
58+
final AsyncCommandBatchCursor<RawBsonDocument> wrapped,
5859
final AsyncReadBinding binding,
5960
@Nullable final BsonDocument resumeToken,
6061
final int maxWireVersion) {
@@ -68,13 +69,13 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
6869
}
6970

7071
@NonNull
71-
AsyncAggregateResponseBatchCursor<RawBsonDocument> getWrapped() {
72+
AsyncCommandBatchCursor<RawBsonDocument> getWrapped() {
7273
return assertNotNull(wrapped.get());
7374
}
7475

7576
@Override
7677
public void next(final SingleResultCallback<List<T>> callback) {
77-
resumeableOperation((cursor, callback1) -> cursor.next(callback1), callback, false);
78+
resumeableOperation(AsyncBatchCursor::next, callback, false);
7879
}
7980

8081
@Override
@@ -129,15 +130,15 @@ private void nullifyAndCloseWrapped() {
129130

130131
/**
131132
* This method guarantees that the {@code newValue} argument is closed even if
132-
* {@link #setWrappedOrCloseIt(AsyncAggregateResponseBatchCursor)} is called concurrently with or after (in the happens-before order)
133+
* {@code setWrappedOrCloseIt(AsyncCommandBatchCursor)} is called concurrently with or after (in the happens-before order)
133134
* the method {@link #close()}.
134135
*/
135-
private void setWrappedOrCloseIt(final AsyncAggregateResponseBatchCursor<RawBsonDocument> newValue) {
136+
private void setWrappedOrCloseIt(final AsyncCommandBatchCursor<RawBsonDocument> newValue) {
136137
if (isClosed()) {
137-
assertNull(this.wrapped.get());
138+
assertNull(wrapped.get());
138139
newValue.close();
139140
} else {
140-
assertNull(this.wrapped.getAndSet(newValue));
141+
assertNull(wrapped.getAndSet(newValue));
141142
if (isClosed()) {
142143
nullifyAndCloseWrapped();
143144
}
@@ -164,8 +165,8 @@ public int getMaxWireVersion() {
164165
return maxWireVersion;
165166
}
166167

167-
private void cachePostBatchResumeToken(final AsyncAggregateResponseBatchCursor<RawBsonDocument> queryBatchCursor) {
168-
BsonDocument resumeToken = queryBatchCursor.getPostBatchResumeToken();
168+
private void cachePostBatchResumeToken(final AsyncCommandBatchCursor<RawBsonDocument> cursor) {
169+
BsonDocument resumeToken = cursor.getPostBatchResumeToken();
169170
if (resumeToken != null) {
170171
this.resumeToken = resumeToken;
171172
}
@@ -182,13 +183,13 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult
182183
tryNext ? "tryNext()" : "next()")));
183184
return;
184185
}
185-
AsyncAggregateResponseBatchCursor<RawBsonDocument> wrappedCursor = getWrapped();
186+
AsyncCommandBatchCursor<RawBsonDocument> wrappedCursor = getWrapped();
186187
asyncBlock.apply(wrappedCursor, (result, t) -> {
187188
if (t == null) {
188189
try {
189190
List<T> convertedResults;
190191
try {
191-
convertedResults = convertAndProduceLastId(result, changeStreamOperation.getDecoder(),
192+
convertedResults = convertAndProduceLastId(assertNotNull(result), changeStreamOperation.getDecoder(),
192193
lastId -> resumeToken = lastId);
193194
} finally {
194195
cachePostBatchResumeToken(wrappedCursor);
@@ -215,14 +216,15 @@ private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallb
215216
if (t != null) {
216217
callback.onResult(null, t);
217218
} else {
218-
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
219+
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken,
220+
assertNotNull(source).getServerDescription().getMaxWireVersion());
219221
source.release();
220222
changeStreamOperation.executeAsync(binding, (result, t1) -> {
221223
if (t1 != null) {
222224
callback.onResult(null, t1);
223225
} else {
224226
try {
225-
setWrappedOrCloseIt(((AsyncChangeStreamBatchCursor<T>) result).getWrapped());
227+
setWrappedOrCloseIt(assertNotNull((AsyncChangeStreamBatchCursor<T>) result).getWrapped());
226228
} finally {
227229
try {
228230
binding.release(); // release the new change stream batch cursor's reference to the binding

0 commit comments

Comments
 (0)