Skip to content

Commit 600f2c6

Browse files
authored
Update cursors to refresh timeoutMS on close without affecting the timeout of the operation (#1527)
JAVA-5615
1 parent 90976e4 commit 600f2c6

File tree

12 files changed

+399
-125
lines changed

12 files changed

+399
-125
lines changed

driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,25 @@
1717

1818
import com.mongodb.MongoClientException;
1919
import com.mongodb.MongoOperationTimeoutException;
20+
import com.mongodb.internal.async.AsyncRunnable;
21+
import com.mongodb.internal.async.SingleResultCallback;
2022
import com.mongodb.internal.connection.CommandMessage;
2123
import com.mongodb.internal.time.StartTime;
2224
import com.mongodb.internal.time.Timeout;
2325
import com.mongodb.lang.Nullable;
2426
import com.mongodb.session.ClientSession;
2527

2628
import java.util.Objects;
29+
import java.util.Optional;
2730
import java.util.function.LongConsumer;
2831

2932
import static com.mongodb.assertions.Assertions.assertNull;
3033
import static com.mongodb.assertions.Assertions.isTrue;
3134
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
35+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
3236
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE;
37+
import static java.util.Optional.empty;
38+
import static java.util.Optional.ofNullable;
3339
import static java.util.concurrent.TimeUnit.MILLISECONDS;
3440
import static java.util.concurrent.TimeUnit.NANOSECONDS;
3541

@@ -262,10 +268,53 @@ public int getConnectTimeoutMs() {
262268
() -> throwMongoTimeoutException("The operation exceeded the timeout limit.")));
263269
}
264270

271+
/**
272+
* @see #hasTimeoutMS()
273+
* @see #doWithResetTimeout(Runnable)
274+
* @see #doWithResetTimeout(AsyncRunnable, SingleResultCallback)
275+
*/
265276
public void resetTimeoutIfPresent() {
277+
getAndResetTimeoutIfPresent();
278+
}
279+
280+
/**
281+
* @see #hasTimeoutMS()
282+
* @return A {@linkplain Optional#isPresent() non-empty} previous {@linkplain Timeout} iff {@link #hasTimeoutMS()},
283+
* i.e., iff it was reset.
284+
*/
285+
private Optional<Timeout> getAndResetTimeoutIfPresent() {
286+
Timeout result = timeout;
266287
if (hasTimeoutMS()) {
267288
timeout = startTimeout(timeoutSettings.getTimeoutMS());
289+
return ofNullable(result);
268290
}
291+
return empty();
292+
}
293+
294+
/**
295+
* @see #resetTimeoutIfPresent()
296+
*/
297+
public void doWithResetTimeout(final Runnable action) {
298+
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
299+
try {
300+
action.run();
301+
} finally {
302+
originalTimeout.ifPresent(original -> timeout = original);
303+
}
304+
}
305+
306+
/**
307+
* @see #resetTimeoutIfPresent()
308+
*/
309+
public void doWithResetTimeout(final AsyncRunnable action, final SingleResultCallback<Void> callback) {
310+
beginAsync().thenRun(c -> {
311+
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
312+
beginAsync().thenRun(c2 -> {
313+
action.finish(c2);
314+
}).thenAlwaysRunAndFinish(() -> {
315+
originalTimeout.ifPresent(original -> timeout = original);
316+
}, c);
317+
}).finish(callback);
269318
}
270319

271320
/**

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 69 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.internal.operation;
1818

1919
import com.mongodb.MongoCommandException;
20+
import com.mongodb.MongoException;
2021
import com.mongodb.MongoNamespace;
2122
import com.mongodb.MongoOperationTimeoutException;
2223
import com.mongodb.MongoSocketException;
@@ -51,6 +52,7 @@
5152
import static com.mongodb.assertions.Assertions.assertNotNull;
5253
import static com.mongodb.assertions.Assertions.assertTrue;
5354
import static com.mongodb.assertions.Assertions.doesNotThrow;
55+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
5456
import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
5557
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
5658
import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
@@ -63,16 +65,18 @@
6365
class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
6466

6567
private final MongoNamespace namespace;
66-
private final long maxTimeMS;
6768
private final Decoder<T> decoder;
6869
@Nullable
6970
private final BsonValue comment;
7071
private final int maxWireVersion;
7172
private final boolean firstBatchEmpty;
7273
private final ResourceManager resourceManager;
74+
private final OperationContext operationContext;
75+
private final TimeoutMode timeoutMode;
7376
private final AtomicBoolean processedInitial = new AtomicBoolean();
7477
private int batchSize;
7578
private volatile CommandCursorResult<T> commandCursorResult;
79+
private boolean resetTimeoutWhenClosing;
7680

7781
AsyncCommandBatchCursor(
7882
final TimeoutMode timeoutMode,
@@ -86,24 +90,25 @@ class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T>
8690
this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument);
8791
this.namespace = commandCursorResult.getNamespace();
8892
this.batchSize = batchSize;
89-
this.maxTimeMS = maxTimeMS;
9093
this.decoder = decoder;
9194
this.comment = comment;
9295
this.maxWireVersion = connectionDescription.getMaxWireVersion();
9396
this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
97+
operationContext = connectionSource.getOperationContext();
98+
this.timeoutMode = timeoutMode;
9499

95-
connectionSource.getOperationContext().getTimeoutContext().setMaxTimeOverride(maxTimeMS);
100+
operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);
96101

97102
AsyncConnection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER
98103
? connection : null;
99-
resourceManager = new ResourceManager(timeoutMode, namespace, connectionSource, connectionToPin,
100-
commandCursorResult.getServerCursor());
104+
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
105+
resetTimeoutWhenClosing = true;
101106
}
102107

103108
@Override
104109
public void next(final SingleResultCallback<List<T>> callback) {
105110
resourceManager.execute(funcCallback -> {
106-
resourceManager.checkTimeoutModeAndResetTimeoutContextIfIteration();
111+
checkTimeoutModeAndResetTimeoutContextIfIteration();
107112
ServerCursor localServerCursor = resourceManager.getServerCursor();
108113
boolean serverCursorIsNull = localServerCursor == null;
109114
List<T> batchResults = emptyList();
@@ -168,6 +173,12 @@ public int getMaxWireVersion() {
168173
return maxWireVersion;
169174
}
170175

176+
void checkTimeoutModeAndResetTimeoutContextIfIteration() {
177+
if (timeoutMode == TimeoutMode.ITERATION) {
178+
operationContext.getTimeoutContext().resetTimeoutIfPresent();
179+
}
180+
}
181+
171182
private void getMore(final ServerCursor cursor, final SingleResultCallback<List<T>> callback) {
172183
resourceManager.executeWithConnection((connection, wrappedCallback) ->
173184
getMoreLoop(assertNotNull(connection), cursor, wrappedCallback), callback);
@@ -216,21 +227,24 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA
216227
return commandCursorResult;
217228
}
218229

219-
void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) {
220-
this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset);
230+
/**
231+
* Configures the cursor to {@link #close()}
232+
* without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}.
233+
* This is useful when managing the {@link #close()} behavior externally.
234+
*/
235+
AsyncCommandBatchCursor<T> disableTimeoutResetWhenClosing() {
236+
resetTimeoutWhenClosing = false;
237+
return this;
221238
}
222239

223240
@ThreadSafe
224-
private static final class ResourceManager extends CursorResourceManager<AsyncConnectionSource, AsyncConnection> {
225-
241+
private final class ResourceManager extends CursorResourceManager<AsyncConnectionSource, AsyncConnection> {
226242
ResourceManager(
227-
final TimeoutMode timeoutMode,
228243
final MongoNamespace namespace,
229244
final AsyncConnectionSource connectionSource,
230245
@Nullable final AsyncConnection connectionToPin,
231246
@Nullable final ServerCursor serverCursor) {
232-
super(connectionSource.getOperationContext().getTimeoutContext(), timeoutMode, namespace, connectionSource, connectionToPin,
233-
serverCursor);
247+
super(namespace, connectionSource, connectionToPin, serverCursor);
234248
}
235249

236250
/**
@@ -244,7 +258,7 @@ <R> void execute(final AsyncCallbackSupplier<R> operation, final SingleResultCal
244258
} else {
245259
operation.whenComplete(() -> {
246260
endOperation();
247-
if (getServerCursor() == null) {
261+
if (super.getServerCursor() == null) {
248262
// At this point all resources have been released,
249263
// but `isClose` may still be returning `false` if `close` have not been called.
250264
// Self-close to update the state managed by `ResourceManger`, and so that `isClosed` return `true`.
@@ -261,23 +275,41 @@ void markAsPinned(final AsyncConnection connectionToPin, final Connection.Pinnin
261275

262276
@Override
263277
void doClose() {
264-
if (isSkipReleasingServerResourcesOnClose()) {
265-
unsetServerCursor();
278+
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
279+
timeoutContext.resetToDefaultMaxTime();
280+
SingleResultCallback<Void> thenDoNothing = (r, t) -> {};
281+
if (resetTimeoutWhenClosing) {
282+
timeoutContext.doWithResetTimeout(this::releaseResourcesAsync, thenDoNothing);
283+
} else {
284+
releaseResourcesAsync(thenDoNothing);
266285
}
286+
}
267287

268-
resetTimeout();
269-
if (getServerCursor() != null) {
270-
getConnection((connection, t) -> {
271-
if (connection != null) {
272-
releaseServerAndClientResources(connection);
273-
} else {
274-
unsetServerCursor();
275-
releaseClientResources();
276-
}
277-
});
278-
} else {
288+
private void releaseResourcesAsync(final SingleResultCallback<Void> callback) {
289+
beginAsync().thenRunTryCatchAsyncBlocks(c -> {
290+
if (isSkipReleasingServerResourcesOnClose()) {
291+
unsetServerCursor();
292+
}
293+
if (super.getServerCursor() != null) {
294+
beginAsync().<AsyncConnection>thenSupply(c2 -> {
295+
getConnection(c2);
296+
}).thenConsume((connection, c3) -> {
297+
beginAsync().thenRun(c4 -> {
298+
releaseServerResourcesAsync(connection, c4);
299+
}).thenAlwaysRunAndFinish(() -> {
300+
connection.release();
301+
}, c3);
302+
}).finish(c);
303+
} else {
304+
c.complete(c);
305+
}
306+
}, MongoException.class, (e, c5) -> {
307+
c5.complete(c5); // ignore exceptions when releasing server resources
308+
}).thenAlwaysRunAndFinish(() -> {
309+
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
310+
unsetServerCursor();
279311
releaseClientResources();
280-
}
312+
}, callback);
281313
}
282314

283315
<R> void executeWithConnection(final AsyncCallableConnectionWithCallback<R> callable, final SingleResultCallback<R> callback) {
@@ -314,25 +346,21 @@ private void getConnection(final SingleResultCallback<AsyncConnection> callback)
314346
}
315347
}
316348

317-
private void releaseServerAndClientResources(final AsyncConnection connection) {
318-
AsyncCallbackSupplier<Void> callbackSupplier = funcCallback -> {
319-
ServerCursor localServerCursor = getServerCursor();
349+
private void releaseServerResourcesAsync(final AsyncConnection connection, final SingleResultCallback<Void> callback) {
350+
beginAsync().thenRun((c) -> {
351+
ServerCursor localServerCursor = super.getServerCursor();
320352
if (localServerCursor != null) {
321-
killServerCursor(getNamespace(), localServerCursor, connection, funcCallback);
353+
killServerCursorAsync(getNamespace(), localServerCursor, connection, callback);
354+
} else {
355+
c.complete(c);
322356
}
323-
};
324-
callbackSupplier.whenComplete(() -> {
357+
}).thenAlwaysRunAndFinish(() -> {
325358
unsetServerCursor();
326-
releaseClientResources();
327-
}).whenComplete(connection::release).get((r, t) -> { /* do nothing */ });
359+
}, callback);
328360
}
329361

330-
private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor,
362+
private void killServerCursorAsync(final MongoNamespace namespace, final ServerCursor localServerCursor,
331363
final AsyncConnection localConnection, final SingleResultCallback<Void> callback) {
332-
OperationContext operationContext = assertNotNull(getConnectionSource()).getOperationContext();
333-
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
334-
timeoutContext.resetToDefaultMaxTime();
335-
336364
localConnection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
337365
NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(),
338366
operationContext, (r, t) -> callback.onResult(null, null));

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ private AggregateOperationImpl<RawBsonDocument> getAggregateOperation(final Time
195195
@Override
196196
public BatchCursor<T> execute(final ReadBinding binding) {
197197
TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext();
198-
CommandBatchCursor<RawBsonDocument> cursor = (CommandBatchCursor<RawBsonDocument>) getAggregateOperation(timeoutContext).execute(binding);
199-
cursor.setCloseWithoutTimeoutReset(true);
198+
CommandBatchCursor<RawBsonDocument> cursor = ((CommandBatchCursor<RawBsonDocument>) getAggregateOperation(timeoutContext).execute(binding))
199+
.disableTimeoutResetWhenClosing();
200200

201201
return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
202202
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
@@ -210,8 +210,8 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
210210
if (t != null) {
211211
callback.onResult(null, t);
212212
} else {
213-
AsyncCommandBatchCursor<RawBsonDocument> cursor = (AsyncCommandBatchCursor<RawBsonDocument>) assertNotNull(result);
214-
cursor.setCloseWithoutTimeoutReset(true);
213+
AsyncCommandBatchCursor<RawBsonDocument> cursor = ((AsyncCommandBatchCursor<RawBsonDocument>) assertNotNull(result))
214+
.disableTimeoutResetWhenClosing();
215215

216216
callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
217217
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),

0 commit comments

Comments
 (0)