Skip to content

Commit 5ff0c8f

Browse files
committed
JAVAify the test
1 parent 66245ee commit 5ff0c8f

File tree

11 files changed

+1089
-1667
lines changed

11 files changed

+1089
-1667
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void next(final SingleResultCallback<List<T>> callback) {
119119
return;
120120
}
121121

122-
ServerCursor localServerCursor = resourceManager.serverCursor();
122+
ServerCursor localServerCursor = resourceManager.getServerCursor();
123123
boolean cursorClosed = localServerCursor == null;
124124
List<T> batchResults = emptyList();
125125
if (!processedInitial.getAndSet(true) && !firstBatchEmpty) {
@@ -180,7 +180,7 @@ public int getMaxWireVersion() {
180180
@Nullable
181181
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
182182
ServerCursor getServerCursor() {
183-
return resourceManager.serverCursor();
183+
return resourceManager.getServerCursor();
184184
}
185185

186186
private void getMore(final ServerCursor cursor, final SingleResultCallback<List<T>> callback) {
@@ -192,7 +192,7 @@ private void getMore(final AsyncConnection connection, final ServerCursor cursor
192192
getMoreCommandDocument(cursor.getId(), connection.getDescription(), namespace,
193193
limit, batchSize, count.get(), maxTimeMS, comment),
194194
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(),
195-
CommandResultDocumentCodec.create(decoder, NEXT_BATCH), assertNotNull(resourceManager.connectionSource),
195+
CommandResultDocumentCodec.create(decoder, NEXT_BATCH), assertNotNull(resourceManager.getConnectionSource()),
196196
(commandResult, t) -> {
197197
if (t != null) {
198198
Throwable translatedException =
@@ -224,6 +224,7 @@ private void getMore(final AsyncConnection connection, final ServerCursor cursor
224224
resourceManager.endOperation();
225225
if (limitReached()) {
226226
resourceManager.releaseServerAndClientResources(connection);
227+
close();
227228
}
228229
connection.release();
229230
callback.onResult(commandCursor.getResults(), null);
@@ -272,11 +273,12 @@ void executeWithConnection(final Consumer<AsyncConnection> action) {
272273

273274
@Override
274275
<R> void executeWithConnection(final Consumer<AsyncConnection> action, final SingleResultCallback<R> callback) {
275-
assertTrue(state != State.IDLE);
276+
assertTrue(getState() != State.IDLE);
277+
AsyncConnection pinnedConnection = getPinnedConnection();
276278
if (pinnedConnection != null) {
277279
executeWithConnection(assertNotNull(pinnedConnection).retain(), null, action, callback);
278280
} else {
279-
assertNotNull(connectionSource).getConnection((conn, t) -> executeWithConnection(conn, t, action, callback));
281+
assertNotNull(getConnectionSource()).getConnection((conn, t) -> executeWithConnection(conn, t, action, callback));
280282
}
281283
}
282284

@@ -290,10 +292,7 @@ <R> void executeWithConnection(
290292
callback.onResult(null, t);
291293
} else {
292294
AsyncCallbackSupplier<R> curriedFunction = c -> function.accept(connection);
293-
curriedFunction.whenComplete(() -> {
294-
System.out.println("COMPLETED!!!!");
295-
connection.release();
296-
}).get((result, error) -> {
295+
curriedFunction.whenComplete(connection::release).get((result, error) -> {
297296
if (error instanceof MongoSocketException) {
298297
onCorruptedConnection(connection);
299298
}
@@ -304,17 +303,17 @@ <R> void executeWithConnection(
304303

305304
@Override
306305
void doClose() {
307-
if (skipReleasingServerResourcesOnClose) {
308-
serverCursor = null;
306+
if (isSkipReleasingServerResourcesOnClose()) {
307+
unsetServerCursor();
309308
}
310309

311-
if (serverCursor != null) {
310+
if (getServerCursor() != null) {
312311
executeWithConnection(conn -> {
313312
releaseServerResources(conn);
314313
conn.release();
315314
}, (r, t) -> {
316315
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
317-
serverCursor = null;
316+
unsetServerCursor();
318317
releaseClientResources();
319318
});
320319
} else {
@@ -325,9 +324,13 @@ void doClose() {
325324
@Override
326325
void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final AsyncConnection connection) {
327326
connection
327+
.retain()
328328
.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, serverCursor),
329-
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(), assertNotNull(connectionSource),
330-
(r, t) -> releaseClientResources());
329+
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(),
330+
assertNotNull(getConnectionSource()), (r, t) -> {
331+
connection.release();
332+
releaseClientResources();
333+
});
331334
}
332335
}
333336
}

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ private boolean doHasNext() {
120120
return false;
121121
}
122122

123-
while (resourceManager.serverCursor() != null) {
123+
while (resourceManager.getServerCursor() != null) {
124124
getMore();
125125
if (!resourceManager.operable()) {
126126
throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_CURSOR);
@@ -200,7 +200,7 @@ private boolean tryHasNext() {
200200
return false;
201201
}
202202

203-
if (resourceManager.serverCursor() != null) {
203+
if (resourceManager.getServerCursor() != null) {
204204
getMore();
205205
}
206206

@@ -213,7 +213,7 @@ public ServerCursor getServerCursor() {
213213
if (!resourceManager.operable()) {
214214
throw new IllegalStateException(MESSAGE_IF_CLOSED_AS_ITERATOR);
215215
}
216-
return resourceManager.serverCursor();
216+
return resourceManager.getServerCursor();
217217
}
218218

219219
@Override
@@ -246,7 +246,7 @@ public int getMaxWireVersion() {
246246
}
247247

248248
private void getMore() {
249-
ServerCursor serverCursor = assertNotNull(resourceManager.serverCursor());
249+
ServerCursor serverCursor = assertNotNull(resourceManager.getServerCursor());
250250
resourceManager.executeWithConnection(connection -> {
251251
ServerCursor nextServerCursor;
252252
try {
@@ -258,7 +258,7 @@ private void getMore() {
258258
NO_OP_FIELD_NAME_VALIDATOR,
259259
ReadPreference.primary(),
260260
CommandResultDocumentCodec.create(decoder, NEXT_BATCH),
261-
assertNotNull(resourceManager.connectionSource))));
261+
assertNotNull(resourceManager.getConnectionSource()))));
262262
nextServerCursor = commandCursor.getServerCursor();
263263
} catch (MongoCommandException e) {
264264
throw translateCommandException(e, serverCursor);
@@ -291,7 +291,7 @@ private boolean limitReached() {
291291
@ThreadSafe
292292
private static final class ResourceManager extends CursorResourceManager<ConnectionSource, Connection> {
293293

294-
public ResourceManager(
294+
ResourceManager(
295295
final MongoNamespace namespace,
296296
final ConnectionSource connectionSource,
297297
@Nullable final Connection connectionToPin,
@@ -318,11 +318,12 @@ <R> R execute(final String exceptionMessageIfClosed, final Supplier<R> operation
318318
}
319319

320320
private Connection connection() {
321-
assertTrue(state != State.IDLE);
321+
assertTrue(getState() != State.IDLE);
322+
Connection pinnedConnection = getPinnedConnection();
322323
if (pinnedConnection != null) {
323324
return assertNotNull(pinnedConnection).retain();
324325
} else {
325-
return assertNotNull(connectionSource).getConnection();
326+
return assertNotNull(getConnectionSource()).getConnection();
326327
}
327328
}
328329

@@ -355,27 +356,27 @@ <R> void executeWithConnection(final Consumer<Connection> action, final SingleRe
355356

356357
@Override
357358
void doClose() {
358-
if (skipReleasingServerResourcesOnClose) {
359-
serverCursor = null;
359+
if (isSkipReleasingServerResourcesOnClose()) {
360+
unsetServerCursor();
360361
}
361362

362363
try {
363-
if (serverCursor != null) {
364+
if (getServerCursor() != null) {
364365
executeWithConnection(this::releaseServerResources);
365366
}
366367
} catch (MongoException e) {
367368
// ignore exceptions when releasing server resources
368369
} finally {
369370
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
370-
serverCursor = null;
371+
unsetServerCursor();
371372
releaseClientResources();
372373
}
373374
}
374375

375376
@Override
376377
void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final Connection connection) {
377378
connection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, serverCursor),
378-
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(), assertNotNull(connectionSource));
379+
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(), assertNotNull(getConnectionSource()));
379380
releaseClientResources();
380381
}
381382
}

driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ abstract class CursorResourceManager<CS extends ReferenceCounted, C extends Refe
6262

6363
private final MongoNamespace namespace;
6464
private final Lock lock;
65-
volatile State state;
65+
private volatile State state;
6666
@Nullable
67-
volatile CS connectionSource;
67+
private volatile CS connectionSource;
6868
@Nullable
69-
volatile C pinnedConnection;
69+
private volatile C pinnedConnection;
7070
@Nullable
71-
volatile ServerCursor serverCursor;
72-
volatile boolean skipReleasingServerResourcesOnClose;
71+
private volatile ServerCursor serverCursor;
72+
private volatile boolean skipReleasingServerResourcesOnClose;
7373

7474
CursorResourceManager(
7575
final MongoNamespace namespace,
@@ -92,19 +92,37 @@ abstract class CursorResourceManager<CS extends ReferenceCounted, C extends Refe
9292
this.serverCursor = serverCursor;
9393
}
9494

95+
public State getState() {
96+
return state;
97+
}
98+
99+
@Nullable
100+
public CS getConnectionSource() {
101+
return connectionSource;
102+
}
103+
104+
@Nullable
105+
public C getPinnedConnection() {
106+
return pinnedConnection;
107+
}
108+
109+
public boolean isSkipReleasingServerResourcesOnClose() {
110+
return skipReleasingServerResourcesOnClose;
111+
}
112+
95113
abstract void markAsPinned(C connectionToPin, Connection.PinningMode pinningMode);
96114

97-
abstract void executeWithConnection(final Consumer<C> action);
115+
abstract void executeWithConnection(Consumer<C> action);
98116

99-
abstract <R> void executeWithConnection(final Consumer<C> action, final SingleResultCallback<R> callback);
117+
abstract <R> void executeWithConnection(Consumer<C> action, SingleResultCallback<R> callback);
100118

101119
/**
102120
* This method is never executed concurrently with either itself or other operations
103121
* demarcated by {@link #tryStartOperation()}/{@link #endOperation()}.
104122
*/
105123
abstract void doClose();
106124

107-
abstract void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final C connection);
125+
abstract void killServerCursor(MongoNamespace namespace, ServerCursor serverCursor, C connection);
108126

109127
/**
110128
* Thread-safe.
@@ -198,10 +216,14 @@ void onCorruptedConnection(@Nullable final C corruptedConnection) {
198216
* Thread-safe.
199217
*/
200218
@Nullable
201-
ServerCursor serverCursor() {
219+
ServerCursor getServerCursor() {
202220
return serverCursor;
203221
}
204222

223+
void unsetServerCursor() {
224+
this.serverCursor = null;
225+
}
226+
205227
void setServerCursor(@Nullable final ServerCursor serverCursor) {
206228
assertTrue(state.inProgress());
207229
assertNotNull(this.serverCursor);

driver-core/src/test/functional/com/mongodb/client/model/OperationTest.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818

1919
import com.mongodb.ClusterFixture;
2020
import com.mongodb.MongoNamespace;
21+
import com.mongodb.async.FutureResultCallback;
2122
import com.mongodb.client.test.CollectionHelper;
2223
import com.mongodb.internal.connection.ServerHelper;
24+
import com.mongodb.internal.validator.NoOpFieldNameValidator;
2325
import com.mongodb.lang.Nullable;
2426
import org.bson.BsonArray;
2527
import org.bson.BsonDocument;
2628
import org.bson.BsonDouble;
2729
import org.bson.BsonValue;
2830
import org.bson.Document;
31+
import org.bson.FieldNameValidator;
2932
import org.bson.codecs.BsonDocumentCodec;
3033
import org.bson.codecs.DecoderContext;
3134
import org.bson.codecs.DocumentCodec;
@@ -39,8 +42,11 @@
3942
import java.util.Collections;
4043
import java.util.List;
4144
import java.util.Map;
45+
import java.util.concurrent.TimeUnit;
46+
import java.util.function.Consumer;
4247
import java.util.stream.Collectors;
4348

49+
import static com.mongodb.ClusterFixture.TIMEOUT;
4450
import static com.mongodb.ClusterFixture.checkReferenceCountReachesTarget;
4551
import static com.mongodb.ClusterFixture.getAsyncBinding;
4652
import static com.mongodb.ClusterFixture.getBinding;
@@ -54,6 +60,7 @@
5460
public abstract class OperationTest {
5561

5662
protected static final DocumentCodec DOCUMENT_DECODER = new DocumentCodec();
63+
protected static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
5764

5865
@BeforeEach
5966
public void beforeEach() {
@@ -77,15 +84,15 @@ private CollectionHelper<BsonDocument> getCollectionHelper(final MongoNamespace
7784
return new CollectionHelper<>(new BsonDocumentCodec(), namespace);
7885
}
7986

80-
private String getDatabaseName() {
87+
protected String getDatabaseName() {
8188
return ClusterFixture.getDefaultDatabaseName();
8289
}
8390

84-
private String getCollectionName() {
91+
protected String getCollectionName() {
8592
return "test";
8693
}
8794

88-
MongoNamespace getNamespace() {
95+
protected MongoNamespace getNamespace() {
8996
return new MongoNamespace(getDatabaseName(), getCollectionName());
9097
}
9198

@@ -97,7 +104,6 @@ public static BsonDocument toBsonDocument(final BsonDocument bsonDocument) {
97104
return getDefaultCodecRegistry().get(BsonDocument.class).decode(bsonDocument.asBsonReader(), DecoderContext.builder().build());
98105
}
99106

100-
101107
protected List<Bson> assertPipeline(final String stageAsString, final Bson stage) {
102108
List<Bson> pipeline = Collections.singletonList(stage);
103109
return assertPipeline(stageAsString, pipeline);
@@ -159,4 +165,25 @@ protected List<Object> aggregateWithWindowFields(@Nullable final Object partitio
159165
.map(doc -> doc.get("result"))
160166
.collect(toList());
161167
}
168+
169+
protected <T> void ifNotNull(@Nullable final T maybeNull, final Consumer<T> consumer) {
170+
if (maybeNull != null) {
171+
consumer.accept(maybeNull);
172+
}
173+
}
174+
175+
protected void sleep(final long ms) {
176+
try {
177+
Thread.sleep(ms);
178+
} catch (InterruptedException e) {
179+
Thread.currentThread().interrupt();
180+
throw new RuntimeException(e);
181+
}
182+
}
183+
184+
protected <T> T block(final Consumer<FutureResultCallback<T>> consumer) {
185+
FutureResultCallback<T> cb = new FutureResultCallback<>();
186+
consumer.accept(cb);
187+
return cb.get(TIMEOUT, TimeUnit.SECONDS);
188+
}
162189
}

0 commit comments

Comments
 (0)