Skip to content

Commit e19f946

Browse files
committed
Shared a CursorResourceManager between async and sync code
JAVA-5159
1 parent aed0c33 commit e19f946

File tree

5 files changed

+378
-508
lines changed

5 files changed

+378
-508
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 51 additions & 241 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.mongodb.MongoCommandException;
2020
import com.mongodb.MongoException;
2121
import com.mongodb.MongoNamespace;
22+
import com.mongodb.MongoSocketException;
2223
import com.mongodb.ReadPreference;
2324
import com.mongodb.ServerAddress;
2425
import com.mongodb.ServerCursor;
@@ -32,10 +33,7 @@
3233
import com.mongodb.internal.connection.AsyncConnection;
3334
import com.mongodb.internal.connection.Connection;
3435
import com.mongodb.lang.Nullable;
35-
import org.bson.BsonArray;
3636
import org.bson.BsonDocument;
37-
import org.bson.BsonInt64;
38-
import org.bson.BsonString;
3937
import org.bson.BsonTimestamp;
4038
import org.bson.BsonValue;
4139
import org.bson.codecs.BsonDocumentCodec;
@@ -44,14 +42,10 @@
4442
import java.util.List;
4543
import java.util.concurrent.atomic.AtomicBoolean;
4644
import java.util.concurrent.atomic.AtomicInteger;
47-
import java.util.concurrent.locks.Lock;
48-
import java.util.concurrent.locks.ReentrantLock;
4945
import java.util.function.Consumer;
5046

5147
import static com.mongodb.assertions.Assertions.assertNotNull;
52-
import static com.mongodb.assertions.Assertions.assertNull;
5348
import static com.mongodb.assertions.Assertions.assertTrue;
54-
import static com.mongodb.assertions.Assertions.fail;
5549
import static com.mongodb.assertions.Assertions.isTrueArgument;
5650
import static com.mongodb.assertions.Assertions.notNull;
5751
import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
@@ -64,7 +58,6 @@
6458
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
6559
import static java.lang.String.format;
6660
import static java.util.Collections.emptyList;
67-
import static java.util.Collections.singletonList;
6861

6962
class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
7063

@@ -110,9 +103,9 @@ class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T>
110103
connectionToPin = connection;
111104
}
112105

113-
resourceManager = new ResourceManager(connectionSource, connectionToPin, commandCursor.getServerCursor());
106+
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursor.getServerCursor());
114107
if (releaseServerAndResources) {
115-
resourceManager.releaseServerAndClientResources(connection.retain());
108+
resourceManager.releaseServerAndClientResources(connection);
116109
}
117110
}
118111

@@ -191,7 +184,7 @@ ServerCursor getServerCursor() {
191184
}
192185

193186
private void getMore(final ServerCursor cursor, final SingleResultCallback<List<T>> callback) {
194-
resourceManager.executeWithConnection(callback, connection -> getMore(connection, cursor, callback));
187+
resourceManager.executeWithConnection(connection -> getMore(connection, cursor, callback), callback);
195188
}
196189

197190
private void getMore(final AsyncConnection connection, final ServerCursor cursor, final SingleResultCallback<List<T>> callback) {
@@ -219,6 +212,7 @@ private void getMore(final AsyncConnection connection, final ServerCursor cursor
219212

220213
if (!resourceManager.operable()) {
221214
resourceManager.releaseServerAndClientResources(connection);
215+
connection.release();
222216
callback.onResult(emptyList(), null);
223217
return;
224218
}
@@ -230,9 +224,8 @@ private void getMore(final AsyncConnection connection, final ServerCursor cursor
230224
resourceManager.endOperation();
231225
if (limitReached()) {
232226
resourceManager.releaseServerAndClientResources(connection);
233-
} else {
234-
connection.release();
235227
}
228+
connection.release();
236229
callback.onResult(commandCursor.getResults(), null);
237230
}
238231
});
@@ -256,268 +249,85 @@ private boolean limitReached() {
256249
return Math.abs(limit) != 0 && count.get() >= Math.abs(limit);
257250
}
258251

259-
/**
260-
* This class maintains all resources that must be released in {@link AsyncCommandBatchCursor#close()}.
261-
* It also implements a {@linkplain #doClose() deferred close action} such that it is totally ordered with other operations of
262-
* {@link AsyncCommandBatchCursor} (methods {@link #tryStartOperation()}/{@link #endOperation()} must be used properly to enforce the order)
263-
* despite the method {@link AsyncCommandBatchCursor#close()} being called concurrently with those operations.
264-
* This total order induces the happens-before order.
265-
* <p>
266-
* The deferred close action does not violate externally observable idempotence of {@link AsyncCommandBatchCursor#close()},
267-
* because {@link AsyncCommandBatchCursor#close()} is allowed to release resources "eventually".
268-
* <p>
269-
* Only methods explicitly documented as thread-safe are thread-safe,
270-
* others are not and rely on the total order mentioned above.
271-
*/
272252
@ThreadSafe
273-
private final class ResourceManager {
274-
private final Lock lock;
275-
private volatile State state = State.IDLE;
276-
@Nullable
277-
private volatile AsyncConnectionSource connectionSource;
278-
@Nullable
279-
private volatile AsyncConnection pinnedConnection;
280-
@Nullable
281-
private volatile ServerCursor serverCursor;
282-
private volatile boolean skipReleasingServerResourcesOnClose;
283-
284-
ResourceManager(final AsyncConnectionSource connectionSource,
285-
@Nullable final AsyncConnection connectionToPin,
286-
@Nullable final ServerCursor serverCursor) {
287-
lock = new ReentrantLock();
288-
if (serverCursor != null) {
289-
this.connectionSource = notNull("connectionSource", connectionSource).retain();
290-
if (connectionToPin != null) {
291-
this.pinnedConnection = connectionToPin.retain();
292-
connectionToPin.markAsPinned(Connection.PinningMode.CURSOR);
293-
}
294-
}
295-
skipReleasingServerResourcesOnClose = false;
296-
this.serverCursor = serverCursor;
297-
}
298-
299-
/**
300-
* Thread-safe.
301-
*/
302-
boolean operable() {
303-
return state.operable();
304-
}
305-
306-
/**
307-
* Thread-safe.
308-
* Returns {@code true} iff started an operation.
309-
* If {@linkplain State#operable() closed}, then returns false, otherwise completes abruptly.
310-
*
311-
* @throws IllegalStateException Iff another operation is in progress.
312-
*/
313-
private boolean tryStartOperation() throws IllegalStateException {
314-
lock.lock();
315-
try {
316-
State localState = state;
317-
if (!localState.operable()) {
318-
return false;
319-
} else if (localState == State.IDLE) {
320-
state = State.OPERATION_IN_PROGRESS;
321-
return true;
322-
} else if (localState == State.OPERATION_IN_PROGRESS) {
323-
return false;
324-
} else {
325-
throw fail(state.toString());
326-
}
327-
} finally {
328-
lock.unlock();
329-
}
330-
}
331-
332-
/**
333-
* Thread-safe.
334-
*/
335-
private void endOperation() {
336-
boolean doClose = false;
337-
lock.lock();
338-
try {
339-
State localState = state;
340-
if (localState == State.OPERATION_IN_PROGRESS) {
341-
state = State.IDLE;
342-
} else if (localState == State.CLOSE_PENDING) {
343-
state = State.CLOSED;
344-
doClose = true;
345-
} else if (localState != State.CLOSED) {
346-
fail(localState.toString());
347-
}
348-
} finally {
349-
lock.unlock();
350-
}
351-
if (doClose) {
352-
doClose();
353-
}
354-
}
253+
private static final class ResourceManager extends CursorResourceManager<AsyncConnectionSource, AsyncConnection> {
355254

356-
/**
357-
* Thread-safe.
358-
*/
359-
void close() {
360-
boolean doClose = false;
361-
lock.lock();
362-
try {
363-
State localState = state;
364-
if (localState == State.OPERATION_IN_PROGRESS) {
365-
state = State.CLOSE_PENDING;
366-
} else if (localState != State.CLOSED) {
367-
state = State.CLOSED;
368-
doClose = true;
369-
}
370-
} finally {
371-
lock.unlock();
372-
}
373-
if (doClose) {
374-
doClose();
375-
}
255+
ResourceManager(
256+
final MongoNamespace namespace,
257+
final AsyncConnectionSource connectionSource,
258+
@Nullable final AsyncConnection connectionToPin,
259+
@Nullable final ServerCursor serverCursor) {
260+
super(namespace, connectionSource, connectionToPin, serverCursor);
376261
}
377262

378-
/**
379-
* This method is never executed concurrently with either itself or other operations
380-
* demarcated by {@link #tryStartOperation()}/{@link #endOperation()}.
381-
*/
382-
private void doClose() {
383-
if (skipReleasingServerResourcesOnClose) {
384-
serverCursor = null;
385-
releaseClientResources();
386-
} else if (serverCursor != null) {
387-
executeWithConnection((connection, t) -> {
388-
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
389-
serverCursor = null;
390-
releaseClientResources();
391-
}, resourceManager::releaseServerAndClientResources);
392-
}
263+
@Override
264+
void markAsPinned(final AsyncConnection connectionToPin, final Connection.PinningMode pinningMode) {
265+
connectionToPin.markAsPinned(pinningMode);
393266
}
394267

395-
void onCorruptedConnection(@Nullable final AsyncConnection corruptedConnection) {
396-
// if `pinnedConnection` is corrupted, then we cannot kill `serverCursor` via such a connection
397-
AsyncConnection localPinnedConnection = pinnedConnection;
398-
if (localPinnedConnection != null) {
399-
assertTrue(corruptedConnection == localPinnedConnection);
400-
skipReleasingServerResourcesOnClose = true;
401-
}
268+
@Override
269+
void executeWithConnection(final Consumer<AsyncConnection> action) {
270+
throw new UnsupportedOperationException();
402271
}
403272

404-
<R> void executeWithConnection(
405-
final SingleResultCallback<R> callback,
406-
final Consumer<AsyncConnection> function) {
273+
@Override
274+
<R> void executeWithConnection(final Consumer<AsyncConnection> action, final SingleResultCallback<R> callback) {
407275
assertTrue(state != State.IDLE);
408276
if (pinnedConnection != null) {
409-
executeWithConnection(assertNotNull(pinnedConnection).retain(), null, callback, function);
277+
executeWithConnection(assertNotNull(pinnedConnection).retain(), null, action, callback);
410278
} else {
411-
assertNotNull(connectionSource).getConnection((conn, t) -> executeWithConnection(conn, t, callback, function));
279+
assertNotNull(connectionSource).getConnection((conn, t) -> executeWithConnection(conn, t, action, callback));
412280
}
413281
}
414282

415283
<R> void executeWithConnection(
416284
@Nullable final AsyncConnection connection,
417285
@Nullable final Throwable t,
418-
final SingleResultCallback<R> callback,
419-
final Consumer<AsyncConnection> function) {
286+
final Consumer<AsyncConnection> function,
287+
final SingleResultCallback<R> callback) {
420288
assertTrue(connection != null || t != null);
421289
if (t != null) {
422-
try {
423-
onCorruptedConnection(connection);
424-
} catch (Exception suppressed) {
425-
t.addSuppressed(suppressed);
426-
}
427290
callback.onResult(null, t);
428291
} else {
429292
AsyncCallbackSupplier<R> curriedFunction = c -> function.accept(connection);
430-
curriedFunction.whenComplete(connection::release).get(callback);
293+
curriedFunction.whenComplete(() -> {
294+
System.out.println("COMPLETED!!!!");
295+
connection.release();
296+
}).get((result, error) -> {
297+
if (error instanceof MongoSocketException) {
298+
onCorruptedConnection(connection);
299+
}
300+
callback.onResult(result, error);
301+
});
431302
}
432303
}
433304

434-
/**
435-
* Thread-safe.
436-
*/
437-
@Nullable
438-
ServerCursor serverCursor() {
439-
return serverCursor;
440-
}
441-
442-
void setServerCursor(@Nullable final ServerCursor serverCursor) {
443-
assertTrue(state.inProgress());
444-
assertNotNull(connectionSource);
445-
this.serverCursor = serverCursor;
446-
if (serverCursor == null) {
447-
releaseClientResources();
305+
@Override
306+
void doClose() {
307+
if (skipReleasingServerResourcesOnClose) {
308+
serverCursor = null;
448309
}
449-
}
450310

451-
private void releaseServerAndClientResources(final AsyncConnection connection) {
452-
lock.lock();
453-
ServerCursor localServerCursor = serverCursor;
454-
serverCursor = null;
455-
lock.unlock();
456-
if (localServerCursor != null) {
457-
killServerCursor(namespace, localServerCursor, connection);
311+
if (serverCursor != null) {
312+
executeWithConnection(conn -> {
313+
releaseServerResources(conn);
314+
conn.release();
315+
}, (r, t) -> {
316+
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
317+
serverCursor = null;
318+
releaseClientResources();
319+
});
458320
} else {
459-
connection.release();
460321
releaseClientResources();
461322
}
462323
}
463324

464-
private void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final AsyncConnection connection) {
325+
@Override
326+
void killServerCursor(final MongoNamespace namespace, final ServerCursor serverCursor, final AsyncConnection connection) {
465327
connection
466-
.commandAsync(namespace.getDatabaseName(), asKillCursorsCommandDocument(namespace, serverCursor),
328+
.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, serverCursor),
467329
NO_OP_FIELD_NAME_VALIDATOR, ReadPreference.primary(), new BsonDocumentCodec(), assertNotNull(connectionSource),
468-
(r, t) -> {
469-
connection.release();
470-
releaseClientResources();
471-
});
472-
}
473-
474-
private BsonDocument asKillCursorsCommandDocument(final MongoNamespace namespace, final ServerCursor serverCursor) {
475-
return new BsonDocument("killCursors", new BsonString(namespace.getCollectionName()))
476-
.append("cursors", new BsonArray(singletonList(new BsonInt64(serverCursor.getId()))));
477-
}
478-
479-
private void releaseClientResources() {
480-
assertNull(serverCursor);
481-
lock.lock();
482-
AsyncConnectionSource localConnectionSource = connectionSource;
483-
connectionSource = null;
484-
485-
AsyncConnection localPinnedConnection = pinnedConnection;
486-
pinnedConnection = null;
487-
lock.unlock();
488-
489-
if (localConnectionSource != null) {
490-
localConnectionSource.release();
491-
}
492-
if (localPinnedConnection != null) {
493-
localPinnedConnection.release();
494-
}
495-
}
496-
}
497-
498-
private enum State {
499-
IDLE(true, false),
500-
OPERATION_IN_PROGRESS(true, true),
501-
/**
502-
* Implies {@link #OPERATION_IN_PROGRESS}.
503-
*/
504-
CLOSE_PENDING(false, true),
505-
CLOSED(false, false);
506-
507-
private final boolean operable;
508-
private final boolean inProgress;
509-
510-
State(final boolean operable, final boolean inProgress) {
511-
this.operable = operable;
512-
this.inProgress = inProgress;
513-
}
514-
515-
boolean operable() {
516-
return operable;
517-
}
518-
519-
boolean inProgress() {
520-
return inProgress;
330+
(r, t) -> releaseClientResources());
521331
}
522332
}
523333
}

0 commit comments

Comments
 (0)