Skip to content

Store computedServerSelectionTimeout so it can be reused. #1312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class TimeoutContext {

@Nullable
private Timeout timeout;
@Nullable
private Timeout computedServerSelectionTimeout;
private long minRoundTripTimeMS = 0;

public static MongoOperationTimeoutException createMongoTimeoutException() {
Expand Down Expand Up @@ -191,14 +193,17 @@ public long getWriteTimeoutMS() {
return timeoutOrAlternative(0);
}

public int getConnectTimeoutMs() {
return (int) calculateMin(getTimeoutSettings().getConnectTimeoutMS());
}

public void resetTimeout() {
assertNotNull(timeout);
timeout = calculateTimeout(timeoutSettings.getTimeoutMS());
}

/**
* Resest the timeout if this timeout context is being used by pool maintenance
* Resets the timeout if this timeout context is being used by pool maintenance
*/
public void resetMaintenanceTimeout() {
if (isMaintenanceContext && timeout != null && !timeout.isInfinite()) {
Expand Down Expand Up @@ -265,23 +270,53 @@ public static Timeout calculateTimeout(@Nullable final Long timeoutMS) {
return null;
}

public Timeout computedServerSelectionTimeout() {
long ms = getTimeoutSettings().getServerSelectionTimeoutMS();
Timeout serverSelectionTimeout = StartTime.now().timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
return serverSelectionTimeout.orEarlier(timeout);
/**
* Returns the computed server selection timeout
*
* <p>Caches the computed server selection timeout if:
* <ul>
* <li>not in a maintenance context</li>
* <li>there is a timeoutMS, so to keep the same legacy behavior.</li>
* <li>the server selection timeout is less than the remaining overall timeout.</li>
* </ul>
*
* @return the timeout context
*/
public Timeout computeServerSelectionTimeout() {
Timeout serverSelectionTimeout = StartTime.now()
.timeoutAfterOrInfiniteIfNegative(getTimeoutSettings().getServerSelectionTimeoutMS(), MILLISECONDS);


if (isMaintenanceContext || !hasTimeoutMS()) {
return serverSelectionTimeout;
}

if (serverSelectionTimeout.orEarlier(timeout) == timeout) {
return timeout;
}

computedServerSelectionTimeout = serverSelectionTimeout;
return computedServerSelectionTimeout;
}

/**
* Returns the timeout context to use for the handshake process
*
* @return a new timeout context with the cached computed server selection timeout if available or this
*/
public TimeoutContext withComputedServerSelectionTimeoutContext() {
return computedServerSelectionTimeout == null
? this : new TimeoutContext(false, timeoutSettings, computedServerSelectionTimeout);
}

public Timeout startWaitQueueTimeout(final StartTime checkoutStart) {
final long ms = getTimeoutSettings().getMaxWaitTimeMS();
return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
}

public int getConnectTimeoutMs() {
return (int) getTimeoutSettings().getConnectTimeoutMS();
}

@Nullable
public Timeout getTimeout() {
return timeout;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera

ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector);
boolean selectionWaitingLogged = false;
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computedServerSelectionTimeout();
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout();
logServerSelectionStarted(clusterId, operationContext.getId(), serverSelector, description);

while (true) {
Expand Down Expand Up @@ -156,7 +156,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
public void selectServerAsync(final ServerSelector serverSelector, final OperationContext operationContext,
final SingleResultCallback<ServerTuple> callback) {
isTrue("open", !isClosed());
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computedServerSelectionTimeout();
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout();
ServerSelectionRequest request = new ServerSelectionRequest(
serverSelector, getCompositeServerSelector(serverSelector), operationContext.getId(), computedServerSelectionTimeout,
callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,13 @@ public int getGeneration() {
}

@Override
public void open(final OperationContext operationContext) {
public void open(final OperationContext originalOperationContext) {
isTrue("Open already called", stream == null);
stream = streamFactory.create(serverId.getAddress());
try {
OperationContext operationContext = originalOperationContext
.withTimeoutContext(originalOperationContext.getTimeoutContext().withComputedServerSelectionTimeoutContext());

stream.open(operationContext);

InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this, operationContext);
Expand All @@ -218,9 +221,13 @@ public void open(final OperationContext operationContext) {
}

@Override
public void openAsync(final OperationContext operationContext, final SingleResultCallback<Void> callback) {
public void openAsync(final OperationContext originalOperationContext, final SingleResultCallback<Void> callback) {
isTrue("Open already called", stream == null, callback);
try {

OperationContext operationContext = originalOperationContext
.withTimeoutContext(originalOperationContext.getTimeoutContext().withComputedServerSelectionTimeoutContext());

stream = streamFactory.create(serverId.getAddress());
stream.openAsync(operationContext, new AsyncCompletionHandler<Void>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public ClusterClock getClock() {
@Override
public ServerTuple selectServer(final ServerSelector serverSelector, final OperationContext operationContext) {
isTrue("open", !isClosed());
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computedServerSelectionTimeout();
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout();
waitForSrv(computedServerSelectionTimeout);
if (srvRecordResolvedToMultipleHosts) {
throw createResolvedToMultipleHostsException();
Expand Down Expand Up @@ -238,7 +238,7 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati
callback.onResult(null, createShutdownException());
return;
}
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computedServerSelectionTimeout();
Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout();
ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(operationContext.getId(), serverSelector,
computedServerSelectionTimeout, callback);
if (initializationCompleted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void applyResponse(final BsonArray response) {
}

protected void applyApplicationError(final BsonDocument applicationError) {
Timeout serverSelectionTimeout = OPERATION_CONTEXT.getTimeoutContext().computedServerSelectionTimeout();
Timeout serverSelectionTimeout = OPERATION_CONTEXT.getTimeoutContext().computeServerSelectionTimeout();
ServerAddress serverAddress = new ServerAddress(applicationError.getString("address").getValue());
int errorGeneration = applicationError.getNumber("generation",
new BsonInt32(((DefaultServer) getCluster().getServer(serverAddress, serverSelectionTimeout)).getConnectionPool().getGeneration())).intValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class MultiServerClusterSpecification extends Specification {
cluster.close()

when:
cluster.getServer(firstServer, OPERATION_CONTEXT.getTimeoutContext().computedServerSelectionTimeout())
cluster.getServer(firstServer, OPERATION_CONTEXT.getTimeoutContext().computeServerSelectionTimeout())

then:
thrown(IllegalStateException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private void assertServer(final String serverName, final BsonDocument expectedSe

if (expectedServerDescriptionDocument.isDocument("pool")) {
int expectedGeneration = expectedServerDescriptionDocument.getDocument("pool").getNumber("generation").intValue();
Timeout serverSelectionTimeout = OPERATION_CONTEXT.getTimeoutContext().computedServerSelectionTimeout();
Timeout serverSelectionTimeout = OPERATION_CONTEXT.getTimeoutContext().computeServerSelectionTimeout();
DefaultServer server = (DefaultServer) getCluster().getServer(new ServerAddress(serverName), serverSelectionTimeout);
assertEquals(expectedGeneration, server.getConnectionPool().getGeneration());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class SingleServerClusterSpecification extends Specification {

then:
cluster.getServer(firstServer,
OPERATION_CONTEXT.getTimeoutContext().computedServerSelectionTimeout()) == factory.getServer(firstServer)
OPERATION_CONTEXT.getTimeoutContext().computeServerSelectionTimeout()) == factory.getServer(firstServer)

cleanup:
cluster?.close()
Expand All @@ -92,7 +92,7 @@ class SingleServerClusterSpecification extends Specification {
cluster.close()

when:
cluster.getServer(firstServer, OPERATION_CONTEXT.getTimeoutContext().computedServerSelectionTimeout())
cluster.getServer(firstServer, OPERATION_CONTEXT.getTimeoutContext().computeServerSelectionTimeout())

then:
thrown(IllegalStateException)
Expand Down