Skip to content

Remove TimeoutSettings from Operations #1309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.mongodb.Function;
import com.mongodb.WriteConcern;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;

Expand All @@ -32,8 +31,8 @@
public class AbortTransactionOperation extends TransactionOperation {
private BsonDocument recoveryToken;

public AbortTransactionOperation(final TimeoutSettings timeoutSettings, final WriteConcern writeConcern) {
super(timeoutSettings, writeConcern);
public AbortTransactionOperation(final WriteConcern writeConcern) {
super(writeConcern);
}

public AbortTransactionOperation recoveryToken(@Nullable final BsonDocument recoveryToken) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
Expand All @@ -39,19 +38,12 @@
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
abstract class AbstractWriteSearchIndexOperation implements AsyncWriteOperation<Void>, WriteOperation<Void> {
private final TimeoutSettings timeoutSettings;
private final MongoNamespace namespace;

AbstractWriteSearchIndexOperation(final TimeoutSettings timeoutSettings, final MongoNamespace namespace) {
this.timeoutSettings = timeoutSettings;
AbstractWriteSearchIndexOperation(final MongoNamespace namespace) {
this.namespace = namespace;
}

@Override
public TimeoutSettings getTimeoutSettings() {
return timeoutSettings;
}

@Override
public Void execute(final WriteBinding binding) {
return withConnection(binding, connection -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.Collation;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
Expand All @@ -44,14 +43,13 @@
public class AggregateOperation<T> implements AsyncExplainableReadOperation<AsyncBatchCursor<T>>, ExplainableReadOperation<BatchCursor<T>> {
private final AggregateOperationImpl<T> wrapped;

public AggregateOperation(final TimeoutSettings timeoutSettings, final MongoNamespace namespace,
final List<BsonDocument> pipeline, final Decoder<T> decoder) {
this(timeoutSettings, namespace, pipeline, decoder, AggregationLevel.COLLECTION);
public AggregateOperation(final MongoNamespace namespace, final List<BsonDocument> pipeline, final Decoder<T> decoder) {
this(namespace, pipeline, decoder, AggregationLevel.COLLECTION);
}

public AggregateOperation(final TimeoutSettings timeoutSettings, final MongoNamespace namespace,
final List<BsonDocument> pipeline, final Decoder<T> decoder, final AggregationLevel aggregationLevel) {
this.wrapped = new AggregateOperationImpl<>(timeoutSettings, namespace, pipeline, decoder, aggregationLevel);
public AggregateOperation(final MongoNamespace namespace, final List<BsonDocument> pipeline, final Decoder<T> decoder,
final AggregationLevel aggregationLevel) {
this.wrapped = new AggregateOperationImpl<>(namespace, pipeline, decoder, aggregationLevel);
}

public List<BsonDocument> getPipeline() {
Expand Down Expand Up @@ -131,11 +129,6 @@ public AggregateOperation<T> hint(@Nullable final BsonValue hint) {
return this;
}

@Override
public TimeoutSettings getTimeoutSettings() {
return wrapped.getTimeoutSettings();
}

public AggregateOperation<T> timeoutMode(@Nullable final TimeoutMode timeoutMode) {
wrapped.timeoutMode(timeoutMode);
return this;
Expand All @@ -161,7 +154,7 @@ public <R> AsyncReadOperation<R> asAsyncExplainableOperation(@Nullable final Exp
}

<R> CommandReadOperation<R> createExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder<R> resultDecoder) {
return new CommandReadOperation<>(wrapped.getTimeoutSettings(), getNamespace().getDatabaseName(),
return new CommandReadOperation<>(getNamespace().getDatabaseName(),
(operationContext, serverDescription, connectionDescription) ->
asExplainCommand(wrapped.getCommand(operationContext, MIN_WIRE_VERSION), verbosity), resultDecoder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.Collation;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
Expand Down Expand Up @@ -55,7 +54,6 @@ class AggregateOperationImpl<T> implements AsyncReadOperation<AsyncBatchCursor<T
private static final String CURSOR = "cursor";
private static final String FIRST_BATCH = "firstBatch";
private static final List<String> FIELD_NAMES_WITH_RESULT = Arrays.asList(RESULT, FIRST_BATCH);
private final TimeoutSettings timeoutSettings;
private final MongoNamespace namespace;
private final List<BsonDocument> pipeline;
private final Decoder<T> decoder;
Expand All @@ -71,18 +69,17 @@ class AggregateOperationImpl<T> implements AsyncReadOperation<AsyncBatchCursor<T
private BsonDocument variables;
private TimeoutMode timeoutMode;

AggregateOperationImpl(final TimeoutSettings timeoutSettings, final MongoNamespace namespace,
AggregateOperationImpl(final MongoNamespace namespace,
final List<BsonDocument> pipeline, final Decoder<T> decoder, final AggregationLevel aggregationLevel) {
this(timeoutSettings, namespace, pipeline, decoder,
this(namespace, pipeline, decoder,
defaultAggregateTarget(notNull("aggregationLevel", aggregationLevel),
notNull("namespace", namespace).getCollectionName()),
defaultPipelineCreator(pipeline));
}

AggregateOperationImpl(final TimeoutSettings timeoutSettings, final MongoNamespace namespace,
AggregateOperationImpl(final MongoNamespace namespace,
final List<BsonDocument> pipeline, final Decoder<T> decoder, final AggregateTarget aggregateTarget,
final PipelineCreator pipelineCreator) {
this.timeoutSettings = timeoutSettings;
this.namespace = notNull("namespace", namespace);
this.pipeline = notNull("pipeline", pipeline);
this.decoder = notNull("decoder", decoder);
Expand Down Expand Up @@ -158,13 +155,7 @@ BsonValue getHint() {
return hint;
}

@Override
public TimeoutSettings getTimeoutSettings() {
return timeoutSettings;
}

public AggregateOperationImpl<T> timeoutMode(@Nullable final TimeoutMode timeoutMode) {
isTrueArgument("timeoutMode requires timeoutMS.", timeoutMode == null || timeoutSettings.getTimeoutMS() != null);
if (timeoutMode != null) {
this.timeoutMode = timeoutMode;
}
Expand Down Expand Up @@ -229,14 +220,14 @@ BsonDocument getCommand(final OperationContext operationContext, final int maxWi

private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
return (result, source, connection) ->
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0, getMaxTimeForCursor(), decoder,
comment, source, connection);
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection);
}

private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {
return (result, source, connection) ->
new AsyncCommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0, getMaxTimeForCursor(), decoder,
comment, source, connection);
new AsyncCommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
getMaxTimeForCursor(source.getOperationContext()), decoder, comment, source, connection);
}

private TimeoutMode getTimeoutMode() {
Expand All @@ -247,8 +238,8 @@ private TimeoutMode getTimeoutMode() {
return localTimeoutMode;
}

private long getMaxTimeForCursor() {
return timeoutSettings.getMaxAwaitTimeMS();
private long getMaxTimeForCursor(final OperationContext operationContext) {
return operationContext.getTimeoutContext().getMaxAwaitTimeMS();
}

interface AggregateTarget {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.mongodb.WriteConcern;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.Collation;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
Expand Down Expand Up @@ -57,7 +56,6 @@
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public class AggregateToCollectionOperation implements AsyncReadOperation<Void>, ReadOperation<Void> {
private final TimeoutSettings timeoutSettings;
private final MongoNamespace namespace;
private final List<BsonDocument> pipeline;
private final WriteConcern writeConcern;
Expand All @@ -71,15 +69,13 @@ public class AggregateToCollectionOperation implements AsyncReadOperation<Void>,
private BsonValue hint;
private BsonDocument variables;

public AggregateToCollectionOperation(final TimeoutSettings timeoutSettings, final MongoNamespace namespace,
final List<BsonDocument> pipeline, final ReadConcern readConcern, final WriteConcern writeConcern) {
this(timeoutSettings, namespace, pipeline, readConcern, writeConcern, AggregationLevel.COLLECTION);
public AggregateToCollectionOperation(final MongoNamespace namespace, final List<BsonDocument> pipeline, final ReadConcern readConcern,
final WriteConcern writeConcern) {
this(namespace, pipeline, readConcern, writeConcern, AggregationLevel.COLLECTION);
}

public AggregateToCollectionOperation(final TimeoutSettings timeoutSettings, final MongoNamespace namespace,
final List<BsonDocument> pipeline, @Nullable final ReadConcern readConcern, @Nullable final WriteConcern writeConcern,
final AggregationLevel aggregationLevel) {
this.timeoutSettings = timeoutSettings;
public AggregateToCollectionOperation(final MongoNamespace namespace, final List<BsonDocument> pipeline,
@Nullable final ReadConcern readConcern, @Nullable final WriteConcern writeConcern, final AggregationLevel aggregationLevel) {
this.namespace = notNull("namespace", namespace);
this.pipeline = notNull("pipeline", pipeline);
this.writeConcern = writeConcern;
Expand Down Expand Up @@ -156,11 +152,6 @@ public AggregateToCollectionOperation timeoutMode(@Nullable final TimeoutMode ti
return this;
}

@Override
public TimeoutSettings getTimeoutSettings() {
return timeoutSettings;
}

@Override
public Void execute(final ReadBinding binding) {
return executeRetryableRead(binding,
Expand Down
Loading