Skip to content

Commit 898e885

Browse files
authored
Add maxTimeMS support to CRUD operations. (#1297)
JAVA-4057
1 parent 8671e84 commit 898e885

File tree

15 files changed

+260
-230
lines changed

15 files changed

+260
-230
lines changed

driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public static MongoOperationTimeoutException createMongoTimeoutException(final T
5454
if (cause instanceof MongoOperationTimeoutException) {
5555
return (MongoOperationTimeoutException) cause;
5656
}
57-
return new MongoOperationTimeoutException("Operation timed out: " + cause.getMessage());
57+
return new MongoOperationTimeoutException("Operation timed out: " + cause.getMessage(), cause);
5858
}
5959

6060
public static TimeoutContext createMaintenanceTimeoutContext(final TimeoutSettings timeoutSettings) {

driver-core/src/main/com/mongodb/internal/async/function/RetryState.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ public final class RetryState {
7272
*/
7373
public static RetryState withRetryableState(final int retries, final TimeoutContext timeoutContext) {
7474
assertTrue(retries > 0);
75-
return new RetryState(retries, timeoutContext);
75+
if (timeoutContext.hasTimeoutMS()){
76+
return new RetryState(INFINITE_ATTEMPTS, timeoutContext);
77+
}
78+
return new RetryState(retries, null);
7679
}
7780

7881
public static RetryState withNonRetryableState() {

driver-core/src/main/com/mongodb/internal/connection/ProtocolHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ public static MongoException createSpecialException(@Nullable final BsonDocument
246246
int errorCode = getErrorCode(response);
247247
String errorMessage = getErrorMessage(response, errorMessageFieldName);
248248
if (ErrorCategory.fromErrorCode(errorCode) == ErrorCategory.EXECUTION_TIMEOUT) {
249+
//TODO JAVA-5248 when timeoutMS is set, MongoOperationTimeoutException should be thrown.
249250
return new MongoExecutionTimeoutException(errorCode, errorMessage, response);
250251
} else if (isNodeIsRecoveringError(errorCode, errorMessage)) {
251252
return new MongoNodeIsRecoveringException(response, serverAddress);

driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.mongodb.internal.bulk.WriteRequestWithIndex;
3434
import com.mongodb.internal.connection.BulkWriteBatchCombiner;
3535
import com.mongodb.internal.connection.IndexMap;
36+
import com.mongodb.internal.connection.OperationContext;
3637
import com.mongodb.internal.connection.SplittablePayload;
3738
import com.mongodb.internal.session.SessionContext;
3839
import com.mongodb.internal.validator.MappedFieldNameValidator;
@@ -64,6 +65,7 @@
6465
import static com.mongodb.internal.bulk.WriteRequest.Type.INSERT;
6566
import static com.mongodb.internal.bulk.WriteRequest.Type.REPLACE;
6667
import static com.mongodb.internal.bulk.WriteRequest.Type.UPDATE;
68+
import static com.mongodb.internal.operation.CommandOperationHelper.appendMaxTimeMs;
6769
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
6870
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
6971
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
@@ -90,7 +92,7 @@ public final class BulkWriteBatch {
9092
private final BsonDocument command;
9193
private final SplittablePayload payload;
9294
private final List<WriteRequestWithIndex> unprocessed;
93-
private final SessionContext sessionContext;
95+
private final OperationContext operationContext;
9496
private final BsonValue comment;
9597
private final BsonDocument variables;
9698

@@ -99,8 +101,9 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
99101
final boolean ordered, final WriteConcern writeConcern,
100102
final Boolean bypassDocumentValidation, final boolean retryWrites,
101103
final List<? extends WriteRequest> writeRequests,
102-
final SessionContext sessionContext,
104+
final OperationContext operationContext,
103105
@Nullable final BsonValue comment, @Nullable final BsonDocument variables) {
106+
SessionContext sessionContext = operationContext.getSessionContext();
104107
if (sessionContext.hasSession() && !sessionContext.isImplicitSession() && !sessionContext.hasActiveTransaction()
105108
&& !writeConcern.isAcknowledged()) {
106109
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
@@ -119,13 +122,13 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
119122
}
120123
return new BulkWriteBatch(namespace, connectionDescription, ordered, writeConcern, bypassDocumentValidation,
121124
canRetryWrites, new BulkWriteBatchCombiner(connectionDescription.getServerAddress(), ordered, writeConcern),
122-
writeRequestsWithIndex, sessionContext, comment, variables);
125+
writeRequestsWithIndex, operationContext, comment, variables);
123126
}
124127

125128
private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescription connectionDescription,
126129
final boolean ordered, final WriteConcern writeConcern, @Nullable final Boolean bypassDocumentValidation,
127130
final boolean retryWrites, final BulkWriteBatchCombiner bulkWriteBatchCombiner,
128-
final List<WriteRequestWithIndex> writeRequestsWithIndices, final SessionContext sessionContext,
131+
final List<WriteRequestWithIndex> writeRequestsWithIndices, final OperationContext operationContext,
129132
@Nullable final BsonValue comment, @Nullable final BsonDocument variables) {
130133
this.namespace = namespace;
131134
this.connectionDescription = connectionDescription;
@@ -159,11 +162,12 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
159162
this.indexMap = indexMap;
160163
this.unprocessed = unprocessedItems;
161164
this.payload = new SplittablePayload(getPayloadType(batchType), payloadItems);
162-
this.sessionContext = sessionContext;
165+
this.operationContext = operationContext;
163166
this.comment = comment;
164167
this.variables = variables;
165168
this.command = new BsonDocument();
166169

170+
SessionContext sessionContext = operationContext.getSessionContext();
167171
if (!payloadItems.isEmpty()) {
168172
command.put(getCommandName(batchType), new BsonString(namespace.getCollectionName()));
169173
command.put("ordered", new BsonBoolean(ordered));
@@ -185,7 +189,7 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
185189
final boolean ordered, final WriteConcern writeConcern, final Boolean bypassDocumentValidation,
186190
final boolean retryWrites, final BulkWriteBatchCombiner bulkWriteBatchCombiner, final IndexMap indexMap,
187191
final WriteRequest.Type batchType, final BsonDocument command, final SplittablePayload payload,
188-
final List<WriteRequestWithIndex> unprocessed, final SessionContext sessionContext,
192+
final List<WriteRequestWithIndex> unprocessed, final OperationContext operationContext,
189193
@Nullable final BsonValue comment, @Nullable final BsonDocument variables) {
190194
this.namespace = namespace;
191195
this.connectionDescription = connectionDescription;
@@ -198,11 +202,11 @@ private BulkWriteBatch(final MongoNamespace namespace, final ConnectionDescripti
198202
this.payload = payload;
199203
this.unprocessed = unprocessed;
200204
this.retryWrites = retryWrites;
201-
this.sessionContext = sessionContext;
205+
this.operationContext = operationContext;
202206
this.comment = comment;
203207
this.variables = variables;
204208
if (retryWrites) {
205-
command.put("txnNumber", new BsonInt64(sessionContext.advanceTransactionNumber()));
209+
command.put("txnNumber", new BsonInt64(operationContext.getSessionContext().advanceTransactionNumber()));
206210
}
207211
this.command = command;
208212
}
@@ -223,7 +227,7 @@ boolean getRetryWrites() {
223227
}
224228

225229
BsonDocument getCommand() {
226-
return command;
230+
return appendMaxTimeMs(operationContext.getTimeoutContext(), command);
227231
}
228232

229233
SplittablePayload getPayload() {
@@ -266,11 +270,11 @@ BulkWriteBatch getNextBatch() {
266270

267271

268272
return new BulkWriteBatch(namespace, connectionDescription, ordered, writeConcern, bypassDocumentValidation, retryWrites,
269-
bulkWriteBatchCombiner, nextIndexMap, batchType, command, payload.getNextSplit(), unprocessed, sessionContext,
273+
bulkWriteBatchCombiner, nextIndexMap, batchType, command, payload.getNextSplit(), unprocessed, operationContext,
270274
comment, variables);
271275
} else {
272276
return new BulkWriteBatch(namespace, connectionDescription, ordered, writeConcern, bypassDocumentValidation, retryWrites,
273-
bulkWriteBatchCombiner, unprocessed, sessionContext, comment, variables);
277+
bulkWriteBatchCombiner, unprocessed, operationContext, comment, variables);
274278
}
275279
}
276280

driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.mongodb.internal.operation.retry.AttachmentKeys;
3636
import com.mongodb.lang.Nullable;
3737
import org.bson.BsonDocument;
38+
import org.bson.BsonInt64;
3839

3940
import java.util.List;
4041
import java.util.function.Supplier;
@@ -225,6 +226,16 @@ static MongoException transformWriteException(final MongoException exception) {
225226
return exception;
226227
}
227228

229+
static BsonDocument appendMaxTimeMs(final TimeoutContext timeoutContext, final BsonDocument command) {
230+
if (timeoutContext.hasTimeoutMS()) {
231+
long maxTimeMS = timeoutContext.getMaxTimeMS();
232+
if (maxTimeMS > 0) {
233+
command.append("maxTimeMS", new BsonInt64(maxTimeMS));
234+
}
235+
}
236+
return command;
237+
}
238+
228239
private CommandOperationHelper() {
229240
}
230241
}

driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818

1919
import com.mongodb.MongoCommandException;
2020
import com.mongodb.MongoNamespace;
21+
import com.mongodb.MongoOperationTimeoutException;
2122
import com.mongodb.WriteConcern;
23+
import com.mongodb.internal.TimeoutContext;
2224
import com.mongodb.internal.TimeoutSettings;
2325
import com.mongodb.internal.async.SingleResultCallback;
2426
import com.mongodb.internal.binding.AsyncReadWriteBinding;
2527
import com.mongodb.internal.binding.AsyncWriteBinding;
2628
import com.mongodb.internal.binding.ReadWriteBinding;
2729
import com.mongodb.internal.binding.WriteBinding;
2830
import com.mongodb.internal.connection.AsyncConnection;
31+
import com.mongodb.internal.connection.OperationContext;
2932
import com.mongodb.lang.Nullable;
3033
import org.bson.BsonDocument;
3134
import org.bson.BsonString;
@@ -43,6 +46,7 @@
4346
import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback;
4447
import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection;
4548
import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync;
49+
import static com.mongodb.internal.operation.CommandOperationHelper.appendMaxTimeMs;
4650
import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError;
4751
import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError;
4852
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
@@ -98,7 +102,7 @@ public TimeoutSettings getTimeoutSettings() {
98102
public Void execute(final WriteBinding binding) {
99103
BsonDocument localEncryptedFields = getEncryptedFields((ReadWriteBinding) binding);
100104
return withConnection(binding, connection -> {
101-
getCommands(localEncryptedFields).forEach(command -> {
105+
getCommands(localEncryptedFields, binding.getOperationContext()).forEach(command -> {
102106
try {
103107
executeCommand(binding, namespace.getDatabaseName(), command.get(),
104108
connection, writeConcernErrorTransformer());
@@ -121,7 +125,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
121125
if (t1 != null) {
122126
errHandlingCallback.onResult(null, t1);
123127
} else {
124-
new ProcessCommandsCallback(binding, connection, getCommands(result), releasingCallback(errHandlingCallback,
128+
new ProcessCommandsCallback(binding, connection, getCommands(result, binding.getOperationContext()), releasingCallback(errHandlingCallback,
125129
connection))
126130
.onResult(null, null);
127131
}
@@ -157,25 +161,28 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
157161
*
158162
* @return the list of commands to run to create the collection
159163
*/
160-
private List<Supplier<BsonDocument>> getCommands(final BsonDocument encryptedFields) {
164+
private List<Supplier<BsonDocument>> getCommands(final BsonDocument encryptedFields, final OperationContext operationContext) {
165+
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
161166
if (encryptedFields == null) {
162-
return singletonList(this::dropCollectionCommand);
167+
return singletonList(() -> dropCollectionCommand(timeoutContext));
163168
} else {
164169
return asList(
165-
() -> getDropEncryptedFieldsCollectionCommand(encryptedFields, "esc"),
166-
() -> getDropEncryptedFieldsCollectionCommand(encryptedFields, "ecoc"),
167-
this::dropCollectionCommand
170+
() -> getDropEncryptedFieldsCollectionCommand(timeoutContext, encryptedFields, "esc"),
171+
() -> getDropEncryptedFieldsCollectionCommand(timeoutContext, encryptedFields, "ecoc"),
172+
() -> dropCollectionCommand(timeoutContext)
168173
);
169174
}
170175
}
171176

172-
private BsonDocument getDropEncryptedFieldsCollectionCommand(final BsonDocument encryptedFields, final String collectionSuffix) {
177+
private BsonDocument getDropEncryptedFieldsCollectionCommand(final TimeoutContext timeoutContext, final BsonDocument encryptedFields, final String collectionSuffix) {
173178
BsonString defaultCollectionName = new BsonString(ENCRYPT_PREFIX + namespace.getCollectionName() + "." + collectionSuffix);
174-
return new BsonDocument("drop", encryptedFields.getOrDefault(collectionSuffix + "Collection", defaultCollectionName));
179+
BsonDocument commandDocument = new BsonDocument("drop", encryptedFields.getOrDefault(collectionSuffix + "Collection", defaultCollectionName));
180+
return appendMaxTimeMs(timeoutContext, commandDocument);
175181
}
176182

177-
private BsonDocument dropCollectionCommand() {
183+
private BsonDocument dropCollectionCommand(final TimeoutContext timeoutContext) {
178184
BsonDocument commandDocument = new BsonDocument("drop", new BsonString(namespace.getCollectionName()));
185+
appendMaxTimeMs(timeoutContext, commandDocument);
179186
appendWriteConcernToCommand(writeConcern, commandDocument);
180187
return commandDocument;
181188
}
@@ -256,8 +263,12 @@ public void onResult(@Nullable final Void result, @Nullable final Throwable t) {
256263
if (nextCommandFunction == null) {
257264
finalCallback.onResult(null, null);
258265
} else {
259-
executeCommandAsync(binding, namespace.getDatabaseName(), nextCommandFunction.get(),
260-
connection, writeConcernErrorTransformerAsync(), this);
266+
try {
267+
executeCommandAsync(binding, namespace.getDatabaseName(), nextCommandFunction.get(),
268+
connection, writeConcernErrorTransformerAsync(), this);
269+
} catch (MongoOperationTimeoutException operationTimeoutException) {
270+
finalCallback.onResult(null, operationTimeoutException);
271+
}
261272
}
262273
}
263274
}

driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ public BulkWriteResult execute(final WriteBinding binding) {
209209
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
210210
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
211211
connectionDescription, ordered, writeConcern,
212-
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables), timeoutContext);
212+
bypassDocumentValidation, retryWrites, writeRequests, binding.getOperationContext(), comment, variables), timeoutContext);
213213
}
214214
return executeBulkWriteBatch(retryState, binding, connection);
215215
})
@@ -249,7 +249,7 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba
249249
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
250250
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
251251
connectionDescription, ordered, writeConcern,
252-
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables), timeoutContext);
252+
bypassDocumentValidation, retryWrites, writeRequests, binding.getOperationContext(), comment, variables), timeoutContext);
253253
}
254254
} catch (Throwable t) {
255255
releasingCallback.onResult(null, t);

0 commit comments

Comments
 (0)