From 5dbb90634d407809e5439cca7a51e5f39710c22f Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Mon, 5 Jun 2023 14:54:04 -0500 Subject: [PATCH 01/12] Add ExecutionInfo to RequestTracker methods --- .../api/core/tracker/RequestTracker.java | 81 ++++++++++++++++--- .../internal/core/cql/CqlRequestHandler.java | 38 +++++---- .../DefaultLoadBalancingPolicy.java | 4 +- .../tracker/MultiplexingRequestTracker.java | 22 +++-- .../core/tracker/NoopRequestTracker.java | 10 ++- .../internal/core/tracker/RequestLogger.java | 10 ++- .../cql/CqlRequestHandlerTrackerTest.java | 8 +- ...LoadBalancingPolicyRequestTrackerTest.java | 10 ++- .../MultiplexingRequestTrackerTest.java | 26 +++--- 9 files changed, 148 insertions(+), 61 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java index c88a61b037d..e904f3f0755 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java @@ -16,6 +16,7 @@ package com.datastax.oss.driver.api.core.tracker; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -35,7 +36,7 @@ public interface RequestTracker extends AutoCloseable { /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onSuccess(Request, long, DriverExecutionProfile, Node, String)} instead. + * #onSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} instead. */ @Deprecated default void onSuccess( @@ -44,6 +45,21 @@ default void onSuccess( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node) {} + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} instead. + */ + @Deprecated + default void onSuccess( + @NonNull Request request, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String requestLogPrefix) { + // If client doesn't override onSuccess with requestLogPrefix delegate call to the old method + onSuccess(request, latencyNanos, executionProfile, node); + } + /** * Invoked each time a request succeeds. * @@ -52,20 +68,23 @@ default void onSuccess( * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. * @param requestLogPrefix the dedicated log prefix for this request + * @param executionInfo the execution info containing the results of this request */ default void onSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix) { + @NonNull String requestLogPrefix, + @NonNull ExecutionInfo executionInfo) { // If client doesn't override onSuccess with requestLogPrefix delegate call to the old method - onSuccess(request, latencyNanos, executionProfile, node); + onSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onError(Request, Throwable, long, DriverExecutionProfile, Node, String)} instead. + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, String, ExecutionInfo)} + * instead. */ @Deprecated default void onError( @@ -75,6 +94,23 @@ default void onError( @NonNull DriverExecutionProfile executionProfile, @Nullable Node node) {} + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, String, ExecutionInfo)} + * instead. + */ + @Deprecated + default void onError( + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @Nullable Node node, + @NonNull String requestLogPrefix) { + // If client doesn't override onError with requestLogPrefix delegate call to the old method + onError(request, error, latencyNanos, executionProfile, node); + } + /** * Invoked each time a request fails. * @@ -83,6 +119,8 @@ default void onError( * @param executionProfile the execution profile of this request. * @param node the node that returned the error response, or {@code null} if the error occurred * @param requestLogPrefix the dedicated log prefix for this request + * @param executionInfo the execution info being returned to the client for this request if + * available */ default void onError( @NonNull Request request, @@ -90,9 +128,10 @@ default void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, - @NonNull String requestLogPrefix) { - // If client doesn't override onError with requestLogPrefix delegate call to the old method - onError(request, error, latencyNanos, executionProfile, node); + @NonNull String requestLogPrefix, + ExecutionInfo executionInfo) { + // delegate call to the old method + onError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } /** @@ -130,7 +169,8 @@ default void onNodeError( /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String)} instead. + * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} + * instead. */ @Deprecated default void onNodeSuccess( @@ -139,6 +179,22 @@ default void onNodeSuccess( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node) {} + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} + * instead. + */ + default void onNodeSuccess( + @NonNull Request request, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String requestLogPrefix) { + // If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old + // method + onNodeSuccess(request, latencyNanos, executionProfile, node); + } + /** * Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request, * long, DriverExecutionProfile, Node, String)} but at per node level. @@ -148,16 +204,17 @@ default void onNodeSuccess( * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. * @param requestLogPrefix the dedicated log prefix for this request + * @param executionInfo the execution info containing the results of this request */ default void onNodeSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix) { - // If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old - // method - onNodeSuccess(request, latencyNanos, executionProfile, node); + @NonNull String requestLogPrefix, + @NonNull ExecutionInfo executionInfo) { + // delegate call to the old method + onNodeSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); } /** diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index dba2dc38460..ca9a5ae2d20 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -338,13 +338,15 @@ private void setFinalResult( nodeLatencyNanos, callback.executionProfile, callback.node, - logPrefix); + logPrefix, + executionInfo); requestTracker.onSuccess( callback.statement, totalLatencyNanos, callback.executionProfile, callback.node, - logPrefix); + logPrefix, + executionInfo); } if (sessionMetricUpdater.isEnabled( DefaultSessionMetric.CQL_REQUESTS, callback.executionProfile.getName())) { @@ -431,27 +433,29 @@ public void onThrottleFailure(@NonNull RequestThrottlingException error) { private void setFinalError(Statement statement, Throwable error, Node node, int execution) { DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(statement, context); + ExecutionInfo executionInfo = null; if (error instanceof DriverException) { - ((DriverException) error) - .setExecutionInfo( - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile)); + executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + null, + null, + true, + session, + context, + executionProfile); + ((DriverException) error).setExecutionInfo(executionInfo); } if (result.completeExceptionally(error)) { cancelScheduledTasks(); if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; - requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onError( + statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index f79fa55b520..9725757add2 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -238,7 +239,8 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @NonNull ExecutionInfo executionInfo) { updateResponseTimes(node); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java index 8bce9840e2f..6997a00d801 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java @@ -16,6 +16,7 @@ package com.datastax.oss.driver.internal.core.tracker; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -80,9 +81,12 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @NonNull ExecutionInfo executionInfo) { invokeTrackers( - tracker -> tracker.onSuccess(request, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onSuccess( + request, latencyNanos, executionProfile, node, logPrefix, executionInfo), logPrefix, "onSuccess"); } @@ -94,9 +98,12 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @NonNull ExecutionInfo executionInfo) { invokeTrackers( - tracker -> tracker.onError(request, error, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onError( + request, error, latencyNanos, executionProfile, node, logPrefix, executionInfo), logPrefix, "onError"); } @@ -107,9 +114,12 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @NonNull ExecutionInfo executionInfo) { invokeTrackers( - tracker -> tracker.onNodeSuccess(request, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onNodeSuccess( + request, latencyNanos, executionProfile, node, logPrefix, executionInfo), logPrefix, "onNodeSuccess"); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java index 6297f51e789..792c6389180 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java @@ -17,6 +17,7 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.tracker.RequestTracker; @@ -40,7 +41,8 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix) { + @NonNull String requestPrefix, + @NonNull ExecutionInfo executionInfo) { // nothing to do } @@ -51,7 +53,8 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, - @NonNull String requestPrefix) { + @NonNull String requestPrefix, + @NonNull ExecutionInfo executionInfo) { // nothing to do } @@ -72,7 +75,8 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix) { + @NonNull String requestPrefix, + @NonNull ExecutionInfo executionInfo) { // nothing to do } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java index 489c4e61cbe..9fccfc827c8 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java @@ -18,6 +18,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.SessionBuilder; @@ -84,7 +85,8 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @NonNull ExecutionInfo executionInfo) { boolean successEnabled = executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED, false); @@ -137,7 +139,8 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @NonNull ExecutionInfo executionInfo) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED, false)) { return; @@ -191,7 +194,8 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @NonNull ExecutionInfo executionInfo) { // Nothing to do } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java index a6b4ebca2a2..28dea2d4979 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java @@ -27,6 +27,7 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.servererrors.BootstrappingException; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; @@ -36,7 +37,6 @@ import org.junit.Test; public class CqlRequestHandlerTrackerTest extends CqlRequestHandlerTestBase { - @Test public void should_invoke_request_tracker() { try (RequestHandlerTestHarness harness = @@ -77,14 +77,16 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node2), - any(String.class)); + any(String.class), + any(ExecutionInfo.class)); verify(requestTracker) .onSuccess( eq(UNDEFINED_IDEMPOTENCE_STATEMENT), anyLong(), any(DriverExecutionProfile.class), eq(node2), - any(String.class)); + any(String.class), + any(ExecutionInfo.class)); verifyNoMoreInteractions(requestTracker); }); } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java index 6dfad480708..e3cadf79e48 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java @@ -20,6 +20,7 @@ import static org.mockito.BDDMockito.given; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; @@ -32,6 +33,7 @@ public class DefaultLoadBalancingPolicyRequestTrackerTest extends LoadBalancingP @Mock Request request; @Mock DriverExecutionProfile profile; + @Mock ExecutionInfo executionInfo; final String logPrefix = "lbp-test-log-prefix"; private DefaultLoadBalancingPolicy policy; @@ -63,7 +65,7 @@ public void should_record_first_response_time_on_node_success() { nextNanoTime = 123; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, logPrefix, executionInfo); // Then assertThat(policy.responseTimes) @@ -81,7 +83,7 @@ public void should_record_second_response_time_on_node_success() { nextNanoTime = 456; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, logPrefix, executionInfo); // Then assertThat(policy.responseTimes) @@ -105,8 +107,8 @@ public void should_record_further_response_times_on_node_success() { nextNanoTime = 789; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); - policy.onNodeSuccess(request, 0, profile, node2, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, logPrefix, executionInfo); + policy.onNodeSuccess(request, 0, profile, node2, logPrefix, executionInfo); // Then assertThat(policy.responseTimes) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java index 6e65aeafb5f..a4ce02bf37f 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java @@ -26,6 +26,7 @@ import ch.qos.logback.core.Appender; import com.datastax.oss.driver.api.core.DriverExecutionException; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -49,6 +50,7 @@ public class MultiplexingRequestTrackerTest { @Mock private DriverExecutionProfile profile; @Mock private Node node; @Mock private Session session; + @Mock private ExecutionInfo executionInfo; @Mock private Appender appender; @Captor private ArgumentCaptor loggingEventCaptor; @@ -109,12 +111,12 @@ public void should_notify_onSuccess() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onSuccess(request, 123456L, profile, node, "test"); + .onSuccess(request, 123456L, profile, node, "test", executionInfo); // when - tracker.onSuccess(request, 123456L, profile, node, "test"); + tracker.onSuccess(request, 123456L, profile, node, "test", executionInfo); // then - verify(child1).onSuccess(request, 123456L, profile, node, "test"); - verify(child2).onSuccess(request, 123456L, profile, node, "test"); + verify(child1).onSuccess(request, 123456L, profile, node, "test", executionInfo); + verify(child2).onSuccess(request, 123456L, profile, node, "test", executionInfo); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -127,12 +129,12 @@ public void should_notify_onError() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onError(request, error, 123456L, profile, node, "test"); + .onError(request, error, 123456L, profile, node, "test", executionInfo); // when - tracker.onError(request, error, 123456L, profile, node, "test"); + tracker.onError(request, error, 123456L, profile, node, "test", executionInfo); // then - verify(child1).onError(request, error, 123456L, profile, node, "test"); - verify(child2).onError(request, error, 123456L, profile, node, "test"); + verify(child1).onError(request, error, 123456L, profile, node, "test", executionInfo); + verify(child2).onError(request, error, 123456L, profile, node, "test", executionInfo); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -145,12 +147,12 @@ public void should_notify_onNodeSuccess() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onNodeSuccess(request, 123456L, profile, node, "test"); + .onNodeSuccess(request, 123456L, profile, node, "test", executionInfo); // when - tracker.onNodeSuccess(request, 123456L, profile, node, "test"); + tracker.onNodeSuccess(request, 123456L, profile, node, "test", executionInfo); // then - verify(child1).onNodeSuccess(request, 123456L, profile, node, "test"); - verify(child2).onNodeSuccess(request, 123456L, profile, node, "test"); + verify(child1).onNodeSuccess(request, 123456L, profile, node, "test", executionInfo); + verify(child2).onNodeSuccess(request, 123456L, profile, node, "test", executionInfo); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( From 61f67a208c62e9021287f2e8687636d39bfac886 Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Thu, 29 Jun 2023 17:01:06 -0500 Subject: [PATCH 02/12] Address feedback (to squash with "Add ExecutionInfo to RequestTracker methods") --- .../ContinuousRequestHandlerBase.java | 41 +++++++++------- .../core/graph/GraphRequestHandler.java | 48 ++++++++++++------- .../api/core/tracker/RequestTracker.java | 31 ++++++++++-- .../internal/core/cql/CqlRequestHandler.java | 30 ++++++------ .../DefaultLoadBalancingPolicy.java | 3 +- .../tracker/MultiplexingRequestTracker.java | 8 ++-- .../core/tracker/NoopRequestTracker.java | 6 ++- .../internal/core/tracker/RequestLogger.java | 4 +- .../ContinuousCqlRequestHandlerTest.java | 17 +++---- .../core/graph/GraphRequestHandlerTest.java | 7 ++- .../cql/CqlRequestHandlerTrackerTest.java | 7 ++- ...LoadBalancingPolicyRequestTrackerTest.java | 8 ++-- .../MultiplexingRequestTrackerTest.java | 8 ++-- .../tracker/RequestNodeLoggerExample.java | 18 ++++--- 14 files changed, 144 insertions(+), 92 deletions(-) diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index 91fc1ef2f3a..c2f1f9bfa18 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -489,6 +489,9 @@ private class NodeResponseCallback // Coordinates concurrent accesses between the client and I/O threads private final ReentrantLock lock = new ReentrantLock(); + // The execution info for passing to the request tracker. + private ExecutionInfo executionInfo; + // The page queue, storing responses that we have received and have not been consumed by the // client yet. We instantiate it lazily to avoid unnecessary allocation; this is also used to // check if the callback ever tried to enqueue something. @@ -1445,7 +1448,8 @@ private void trackNodeError(@NonNull Node node, @NonNull Throwable error) { long latencyNanos = System.nanoTime() - this.messageStartTimeNanos; context .getRequestTracker() - .onNodeError(this.statement, error, latencyNanos, executionProfile, node, logPrefix); + .onNodeError( + this.statement, error, latencyNanos, executionProfile, node, logPrefix, null); } } @@ -1562,18 +1566,21 @@ private void completeResultSetFuture( if (nodeSuccessReported.compareAndSet(false, true)) { context .getRequestTracker() - .onNodeSuccess(statement, nodeLatencyNanos, executionProfile, node, logPrefix); + .onNodeSuccess( + statement, nodeLatencyNanos, executionProfile, node, logPrefix, executionInfo); } context .getRequestTracker() - .onSuccess(statement, totalLatencyNanos, executionProfile, node, logPrefix); + .onSuccess( + statement, totalLatencyNanos, executionProfile, node, logPrefix, executionInfo); } } else { Throwable error = (Throwable) pageOrError; if (future.completeExceptionally(error)) { context .getRequestTracker() - .onError(statement, error, totalLatencyNanos, executionProfile, node, logPrefix); + .onError( + statement, error, totalLatencyNanos, executionProfile, node, logPrefix, null); if (error instanceof DriverTimeoutException) { throttler.signalTimeout(ContinuousRequestHandlerBase.this); session @@ -1590,18 +1597,20 @@ private void completeResultSetFuture( private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Frame response) { ByteBuffer pagingState = result instanceof Rows ? ((Rows) result).getMetadata().pagingState : null; - return new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - executionIndex, - errors, - pagingState, - response, - true, - session, - context, - executionProfile); + this.executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + executionIndex, + errors, + pagingState, + response, + true, + session, + context, + executionProfile); + return executionInfo; } private void logTimeoutSchedulingError(IllegalStateException timeoutError) { diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java index a710f447512..dc335f12c86 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java @@ -358,9 +358,19 @@ private void setFinalResult( totalLatencyNanos = completionTimeNanos - startTimeNanos; long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos; requestTracker.onNodeSuccess( - callback.statement, nodeLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + nodeLatencyNanos, + executionProfile, + callback.node, + logPrefix, + executionInfo); requestTracker.onSuccess( - callback.statement, totalLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + totalLatencyNanos, + executionProfile, + callback.node, + logPrefix, + executionInfo); } if (sessionMetricUpdater.isEnabled( DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) { @@ -444,27 +454,28 @@ private void setFinalError( GraphStatement statement, Throwable error, Node node, int execution) { DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(statement, context); + ExecutionInfo executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + null, + null, + true, + session, + context, + executionProfile); if (error instanceof DriverException) { - ((DriverException) error) - .setExecutionInfo( - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile)); + ((DriverException) error).setExecutionInfo(executionInfo); } if (result.completeExceptionally(error)) { cancelScheduledTasks(); if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; - requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onError( + statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); @@ -856,7 +867,8 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; - requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onNodeError( + statement, error, latencyNanos, executionProfile, node, logPrefix, null); } @Override diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java index e904f3f0755..169b6028613 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java @@ -77,7 +77,7 @@ default void onSuccess( @NonNull Node node, @NonNull String requestLogPrefix, @NonNull ExecutionInfo executionInfo) { - // If client doesn't override onSuccess with requestLogPrefix delegate call to the old method + // If client doesn't override onSuccess with executionInfo delegate call to the old method onSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); } @@ -129,14 +129,15 @@ default void onError( @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, @NonNull String requestLogPrefix, - ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo) { // delegate call to the old method onError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String)} instead. + * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String, + * ExecutionInfo)} instead. */ @Deprecated default void onNodeError( @@ -146,6 +147,23 @@ default void onNodeError( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node) {} + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String, + * ExecutionInfo)} instead. + */ + @Deprecated + default void onNodeError( + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String requestLogPrefix) { + // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method + onNodeError(request, error, latencyNanos, executionProfile, node); + } + /** * Invoked each time a request fails at the node level. Similar to {@link #onError(Request, * Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level. @@ -155,6 +173,7 @@ default void onNodeError( * @param executionProfile the execution profile of this request. * @param node the node that returned the error response. * @param requestLogPrefix the dedicated log prefix for this request + * @param executionInfo the execution info containing the results of this request if available */ default void onNodeError( @NonNull Request request, @@ -162,9 +181,10 @@ default void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix) { + @NonNull String requestLogPrefix, + @Nullable ExecutionInfo executionInfo) { // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method - onNodeError(request, error, latencyNanos, executionProfile, node); + onNodeError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } /** @@ -184,6 +204,7 @@ default void onNodeSuccess( * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} * instead. */ + @Deprecated default void onNodeSuccess( @NonNull Request request, long latencyNanos, diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index ca9a5ae2d20..0db4f174daf 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -433,21 +433,20 @@ public void onThrottleFailure(@NonNull RequestThrottlingException error) { private void setFinalError(Statement statement, Throwable error, Node node, int execution) { DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(statement, context); - ExecutionInfo executionInfo = null; + ExecutionInfo executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + null, + null, + true, + session, + context, + executionProfile); if (error instanceof DriverException) { - executionInfo = - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile); ((DriverException) error).setExecutionInfo(executionInfo); } if (result.completeExceptionally(error)) { @@ -949,7 +948,8 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; - requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onNodeError( + statement, error, latencyNanos, executionProfile, node, logPrefix, null); } @Override diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 9725757add2..4b3edc1b83f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -251,7 +251,8 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @Nullable ExecutionInfo executionInfo) { updateResponseTimes(node); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java index 6997a00d801..de8894dfb19 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java @@ -99,7 +99,7 @@ public void onError( @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, @NonNull String logPrefix, - @NonNull ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo) { invokeTrackers( tracker -> tracker.onError( @@ -131,10 +131,12 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + ExecutionInfo executionInfo) { invokeTrackers( tracker -> - tracker.onNodeError(request, error, latencyNanos, executionProfile, node, logPrefix), + tracker.onNodeError( + request, error, latencyNanos, executionProfile, node, logPrefix, executionInfo), logPrefix, "onNodeError"); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java index 792c6389180..68cf7f90957 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java @@ -22,6 +22,7 @@ import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import net.jcip.annotations.ThreadSafe; /** @@ -54,7 +55,7 @@ public void onError( @NonNull DriverExecutionProfile executionProfile, Node node, @NonNull String requestPrefix, - @NonNull ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo) { // nothing to do } @@ -65,7 +66,8 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix) { + @NonNull String requestPrefix, + @Nullable ExecutionInfo executionInfo) { // nothing to do } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java index 9fccfc827c8..c3212d6ca9d 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java @@ -24,6 +24,7 @@ import com.datastax.oss.driver.api.core.session.SessionBuilder; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import java.time.Duration; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; @@ -184,7 +185,8 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @Nullable ExecutionInfo executionInfo) { // Nothing to do } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java index 6b1d8919630..cfcdabb716f 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java @@ -22,13 +22,7 @@ import static com.datastax.oss.driver.Assertions.assertThat; import static com.datastax.oss.driver.Assertions.assertThatStage; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.matches; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -496,21 +490,24 @@ public void should_invoke_request_tracker(DseProtocolVersion version) { anyLong(), any(DriverExecutionProfile.class), eq(node1), - matches(LOG_PREFIX_PER_REQUEST)); + matches(LOG_PREFIX_PER_REQUEST), + nullable(ExecutionInfo.class)); verify(requestTracker) .onNodeSuccess( eq(UNDEFINED_IDEMPOTENCE_STATEMENT), anyLong(), any(DriverExecutionProfile.class), eq(node2), - matches(LOG_PREFIX_PER_REQUEST)); + matches(LOG_PREFIX_PER_REQUEST), + any(ExecutionInfo.class)); verify(requestTracker) .onSuccess( eq(UNDEFINED_IDEMPOTENCE_STATEMENT), anyLong(), any(DriverExecutionProfile.class), eq(node2), - matches(LOG_PREFIX_PER_REQUEST)); + matches(LOG_PREFIX_PER_REQUEST), + any(ExecutionInfo.class)); verifyNoMoreInteractions(requestTracker); }); } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java index 2e27dc79cd3..e6311659eda 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java @@ -54,6 +54,7 @@ import com.datastax.oss.driver.api.core.Version; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.internal.core.cql.Conversions; @@ -519,14 +520,16 @@ public void should_invoke_request_tracker_and_update_metrics( anyLong(), any(DriverExecutionProfile.class), eq(node), - matches(LOG_PREFIX_PER_REQUEST)); + matches(LOG_PREFIX_PER_REQUEST), + any(ExecutionInfo.class)); verify(requestTracker) .onNodeSuccess( eq(graphStatement), anyLong(), any(DriverExecutionProfile.class), eq(node), - matches(LOG_PREFIX_PER_REQUEST)); + matches(LOG_PREFIX_PER_REQUEST), + any(ExecutionInfo.class)); verifyNoMoreInteractions(requestTracker); verify(nodeMetricUpdater1) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java index 28dea2d4979..bac89d6666e 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java @@ -16,9 +16,7 @@ package com.datastax.oss.driver.internal.core.cql; import static com.datastax.oss.driver.Assertions.assertThatStage; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -70,7 +68,8 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node1), - any(String.class)); + any(String.class), + nullable(ExecutionInfo.class)); verify(requestTracker) .onNodeSuccess( eq(UNDEFINED_IDEMPOTENCE_STATEMENT), diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java index e3cadf79e48..00800f0920c 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java @@ -133,7 +133,7 @@ public void should_record_first_response_time_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, logPrefix, executionInfo); // Then assertThat(policy.responseTimes) @@ -152,7 +152,7 @@ public void should_record_second_response_time_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, logPrefix, executionInfo); // Then assertThat(policy.responseTimes) @@ -177,8 +177,8 @@ public void should_record_further_response_times_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); - policy.onNodeError(request, iae, 0, profile, node2, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, logPrefix, executionInfo); + policy.onNodeError(request, iae, 0, profile, node2, logPrefix, executionInfo); // Then assertThat(policy.responseTimes) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java index a4ce02bf37f..eedfaff13ca 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java @@ -165,12 +165,12 @@ public void should_notify_onNodeError() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onNodeError(request, error, 123456L, profile, node, "test"); + .onNodeError(request, error, 123456L, profile, node, "test", executionInfo); // when - tracker.onNodeError(request, error, 123456L, profile, node, "test"); + tracker.onNodeError(request, error, 123456L, profile, node, "test", executionInfo); // then - verify(child1).onNodeError(request, error, 123456L, profile, node, "test"); - verify(child2).onNodeError(request, error, 123456L, profile, node, "test"); + verify(child1).onNodeError(request, error, 123456L, profile, node, "test", executionInfo); + verify(child2).onNodeError(request, error, 123456L, profile, node, "test", executionInfo); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java index cf9ba4abd40..9f5aaaf73ba 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java @@ -18,11 +18,13 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.internal.core.tracker.RequestLogFormatter; import com.datastax.oss.driver.internal.core.tracker.RequestLogger; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; public class RequestNodeLoggerExample extends RequestLogger { @@ -32,12 +34,13 @@ public RequestNodeLoggerExample(DriverContext context) { @Override public void onNodeError( - @NonNull Request request, - @NonNull Throwable error, - long latencyNanos, - @NonNull DriverExecutionProfile executionProfile, - @NonNull Node node, - @NonNull String logPrefix) { + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String logPrefix, + @Nullable ExecutionInfo executionInfo) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED)) { return; } @@ -73,7 +76,8 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String logPrefix, + @NonNull ExecutionInfo executionInfo) { boolean successEnabled = executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED); boolean slowEnabled = From 59ebd1ebbee8360ad6f2dacd0024cf197a95a13f Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Wed, 16 Aug 2023 12:18:10 -0500 Subject: [PATCH 03/12] Correct formatting with coveo (to squash with "Add ExecutionInfo to RequestTracker methods") --- .../core/tracker/RequestNodeLoggerExample.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java index 9f5aaaf73ba..122cf1f9282 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java @@ -34,13 +34,13 @@ public RequestNodeLoggerExample(DriverContext context) { @Override public void onNodeError( - @NonNull Request request, - @NonNull Throwable error, - long latencyNanos, - @NonNull DriverExecutionProfile executionProfile, - @NonNull Node node, - @NonNull String logPrefix, - @Nullable ExecutionInfo executionInfo) { + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String logPrefix, + @Nullable ExecutionInfo executionInfo) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED)) { return; } From 7b3db63062b80c734d55e8372a8d9a6451b48f25 Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Tue, 22 Aug 2023 14:55:58 -0500 Subject: [PATCH 04/12] Correct imports in CqlRequestHandlerTrackerTest --- .../internal/core/cql/CqlRequestHandlerTrackerTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java index bac89d6666e..93e7e2b84a8 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java @@ -16,7 +16,10 @@ package com.datastax.oss.driver.internal.core.cql; import static com.datastax.oss.driver.Assertions.assertThatStage; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; From 873768f0d1fa4913ed602e5708eb1caccbc29b54 Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Mon, 18 Sep 2023 13:22:57 -0500 Subject: [PATCH 05/12] Check type for ExecutionInfo in ContinuousRequestHandlerBase. --- .../ContinuousRequestHandlerBase.java | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index c2f1f9bfa18..6cfc3623b14 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -17,11 +17,13 @@ import com.datastax.dse.driver.api.core.DseProtocolVersion; import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet; +import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet; import com.datastax.dse.driver.internal.core.DseProtocolFeature; import com.datastax.dse.driver.internal.core.cql.DseConversions; import com.datastax.dse.protocol.internal.request.Revise; import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata; import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.AsyncPagingIterable; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.DriverTimeoutException; import com.datastax.oss.driver.api.core.NodeUnavailableException; @@ -489,9 +491,6 @@ private class NodeResponseCallback // Coordinates concurrent accesses between the client and I/O threads private final ReentrantLock lock = new ReentrantLock(); - // The execution info for passing to the request tracker. - private ExecutionInfo executionInfo; - // The page queue, storing responses that we have received and have not been consumed by the // client yet. We instantiate it lazily to avoid unnecessary allocation; this is also used to // check if the callback ever tried to enqueue something. @@ -1563,6 +1562,14 @@ private void completeResultSetFuture( if (resultSetClass.isInstance(pageOrError)) { if (future.complete(resultSetClass.cast(pageOrError))) { throttler.signalSuccess(ContinuousRequestHandlerBase.this); + + ExecutionInfo executionInfo = null; + if (pageOrError instanceof AsyncPagingIterable) { + executionInfo = ((AsyncPagingIterable) pageOrError).getExecutionInfo(); + } else if (pageOrError instanceof AsyncGraphResultSet) { + executionInfo = ((AsyncGraphResultSet) pageOrError).getRequestExecutionInfo(); + } + if (nodeSuccessReported.compareAndSet(false, true)) { context .getRequestTracker() @@ -1597,20 +1604,18 @@ private void completeResultSetFuture( private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Frame response) { ByteBuffer pagingState = result instanceof Rows ? ((Rows) result).getMetadata().pagingState : null; - this.executionInfo = - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - executionIndex, - errors, - pagingState, - response, - true, - session, - context, - executionProfile); - return executionInfo; + return new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + executionIndex, + errors, + pagingState, + response, + true, + session, + context, + executionProfile); } private void logTimeoutSchedulingError(IllegalStateException timeoutError) { From 026a6efdd39fcf142919dc58dc69d171b22ef725 Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Mon, 18 Sep 2023 13:34:45 -0500 Subject: [PATCH 06/12] Correct formatting with coveo --- .../driver/internal/core/cql/CqlRequestHandlerTrackerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java index 93e7e2b84a8..2d3c5168155 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java @@ -17,8 +17,8 @@ import static com.datastax.oss.driver.Assertions.assertThatStage; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; From c54749c950b5aa22dab06975a1cc8ccf8e445790 Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Mon, 18 Sep 2023 14:00:51 -0500 Subject: [PATCH 07/12] Correct imports in ContinuousCqlRequestHandlerTest --- .../cql/continuous/ContinuousCqlRequestHandlerTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java index cfcdabb716f..7ca43be5ccf 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java @@ -22,7 +22,14 @@ import static com.datastax.oss.driver.Assertions.assertThat; import static com.datastax.oss.driver.Assertions.assertThatStage; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.matches; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; From 721cd29554a4cf013733e364ff2e540aad133b31 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 17 Apr 2024 14:59:59 -0700 Subject: [PATCH 08/12] jabba use 1.8 --- Jenkinsfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 8d2b74c5b08..7a93dbe3fea 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -478,7 +478,7 @@ pipeline { } stage('Build-Driver') { steps { - buildDriver('default') + buildDriver('1.8') } } stage('Execute-Tests') { @@ -593,7 +593,7 @@ pipeline { stage('Build-Driver') { steps { // Jabba default should be a JDK8 for now - buildDriver('default') + buildDriver('1.8') } } stage('Execute-Tests') { From acf4dcfaa95b0a9a52fde654605baa12f14671dd Mon Sep 17 00:00:00 2001 From: janehe Date: Thu, 18 Apr 2024 08:15:49 -0700 Subject: [PATCH 09/12] delete commit --- Jenkinsfile | 1 - 1 file changed, 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 7a93dbe3fea..5b6711f5b87 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -592,7 +592,6 @@ pipeline { } stage('Build-Driver') { steps { - // Jabba default should be a JDK8 for now buildDriver('1.8') } } From 9f5d2d2a923120295c07e0bcdce8466db5ff1f75 Mon Sep 17 00:00:00 2001 From: janehe Date: Wed, 1 May 2024 19:49:03 -0700 Subject: [PATCH 10/12] refactoring --- .../ContinuousRequestHandlerBase.java | 60 +++++++++++++------ .../internal/core/cql/CqlRequestHandler.java | 15 ++++- 2 files changed, 56 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index 6cfc3623b14..422e0b4eac5 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -626,7 +626,7 @@ public void operationComplete(@NonNull Future future) { Throwable error = future.cause(); if (error instanceof EncoderException && error.getCause() instanceof FrameTooLongException) { - trackNodeError(node, error.getCause()); + trackNodeError(node, error.getCause(), null); lock.lock(); try { abort(error.getCause(), false); @@ -643,7 +643,7 @@ public void operationComplete(@NonNull Future future) { .getMetricUpdater() .incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName()); recordError(node, error); - trackNodeError(node, error.getCause()); + trackNodeError(node, error.getCause(), null); sendRequest(statement, null, executionIndex, retryCount, scheduleSpeculativeExecution); } } else { @@ -737,7 +737,8 @@ private void onPageTimeout(int expectedPage) { * Invoked when a continuous paging response is received, either a successful or failed one. * *

Delegates further processing to appropriate methods: {@link #processResultResponse(Result, - * Frame)} if the response was successful, or {@link #processErrorResponse(Error)} if it wasn't. + * Frame)} if the response was successful, or {@link #processErrorResponse(Error, Frame)} if it + * wasn't. * * @param response the received {@link Frame}. */ @@ -758,15 +759,15 @@ public void onResponse(@NonNull Frame response) { processResultResponse((Result) responseMessage, response); } else if (responseMessage instanceof Error) { LOG.trace("[{}] Got error response", logPrefix); - processErrorResponse((Error) responseMessage); + processErrorResponse((Error) responseMessage, response); } else { IllegalStateException error = new IllegalStateException("Unexpected response " + responseMessage); - trackNodeError(node, error); + trackNodeError(node, error, response); abort(error, false); } } catch (Throwable t) { - trackNodeError(node, t); + trackNodeError(node, t, response); abort(t, false); } } finally { @@ -900,7 +901,7 @@ private void processResultResponse(@NonNull Result result, @Nullable Frame frame * @param errorMessage the error message received. */ @SuppressWarnings("GuardedBy") // this method is only called with the lock held - private void processErrorResponse(@NonNull Error errorMessage) { + private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame frame) { assert lock.isHeldByCurrentThread(); if (errorMessage instanceof Unprepared) { processUnprepared((Unprepared) errorMessage); @@ -909,7 +910,7 @@ private void processErrorResponse(@NonNull Error errorMessage) { if (error instanceof BootstrappingException) { LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node); recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, frame); sendRequest(statement, null, executionIndex, retryCount, false); } else if (error instanceof QueryValidationException || error instanceof FunctionFailureException @@ -921,7 +922,7 @@ private void processErrorResponse(@NonNull Error errorMessage) { NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater(); metricUpdater.incrementCounter( DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName()); - trackNodeError(node, error); + trackNodeError(node, error, frame); abort(error, true); } else { try { @@ -1060,7 +1061,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) { + "This usually happens when you run a 'USE...' query after " + "the statement was prepared.", Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId))); - trackNodeError(node, illegalStateException); + trackNodeError(node, illegalStateException, null); fatalError = illegalStateException; } else { LOG.trace( @@ -1079,18 +1080,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) { || prepareError instanceof FunctionFailureException || prepareError instanceof ProtocolError) { LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix); - trackNodeError(node, prepareError); + trackNodeError(node, prepareError, null); fatalError = prepareError; } } } else if (exception instanceof RequestThrottlingException) { - trackNodeError(node, exception); + trackNodeError(node, exception, null); fatalError = exception; } if (fatalError == null) { LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix); recordError(node, exception); - trackNodeError(node, exception); + trackNodeError(node, exception, null); sendRequest(statement, null, executionIndex, retryCount, false); } } @@ -1118,18 +1119,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab switch (verdict.getRetryDecision()) { case RETRY_SAME: recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, null); sendRequest( verdict.getRetryRequest(statement), node, executionIndex, retryCount + 1, false); break; case RETRY_NEXT: recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, null); sendRequest( verdict.getRetryRequest(statement), null, executionIndex, retryCount + 1, false); break; case RETHROW: - trackNodeError(node, error); + trackNodeError(node, error, null); abort(error, true); break; case IGNORE: @@ -1442,13 +1443,20 @@ private void reenableAutoReadIfNeeded() { // ERROR HANDLING - private void trackNodeError(@NonNull Node node, @NonNull Throwable error) { + private void trackNodeError( + @NonNull Node node, @NonNull Throwable error, @Nullable Frame frame) { if (nodeErrorReported.compareAndSet(false, true)) { long latencyNanos = System.nanoTime() - this.messageStartTimeNanos; context .getRequestTracker() .onNodeError( - this.statement, error, latencyNanos, executionProfile, node, logPrefix, null); + this.statement, + error, + latencyNanos, + executionProfile, + node, + logPrefix, + createExecutionInfo(frame)); } } @@ -1618,6 +1626,22 @@ private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Fram executionProfile); } + @NonNull + private ExecutionInfo createExecutionInfo(@Nullable Frame response) { + return new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + executionIndex, + errors, + null, + response, + true, + session, + context, + executionProfile); + } + private void logTimeoutSchedulingError(IllegalStateException timeoutError) { // If we're racing with session shutdown, the timer might be stopped already. We don't want // to schedule more executions anyway, so swallow the error. diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index 0db4f174daf..eb52f67fc60 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -948,8 +948,21 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; + ExecutionInfo executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + null, + null, + true, + session, + context, + executionProfile); requestTracker.onNodeError( - statement, error, latencyNanos, executionProfile, node, logPrefix, null); + statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo); } @Override From 208d8ed993561574fae19fb7864edecba46e82f0 Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Thu, 9 May 2024 11:45:59 -0500 Subject: [PATCH 11/12] Create ExecutionInfo in CqlRequestHandler.NodeResponseCallback.trackNodeError --- .../internal/core/cql/CqlRequestHandler.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index 0db4f174daf..eb52f67fc60 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -948,8 +948,21 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; + ExecutionInfo executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + null, + null, + true, + session, + context, + executionProfile); requestTracker.onNodeError( - statement, error, latencyNanos, executionProfile, node, logPrefix, null); + statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo); } @Override From 78b7e497de8d320e38855314c2359cd2a896aa10 Mon Sep 17 00:00:00 2001 From: Andrew Weaver Date: Thu, 9 May 2024 16:52:47 -0500 Subject: [PATCH 12/12] Refactor order of RequestTracker args so requestLogPrefix is last --- .../ContinuousRequestHandlerBase.java | 10 ++--- .../core/graph/GraphRequestHandler.java | 12 ++--- .../api/core/tracker/RequestTracker.java | 44 +++++++++---------- .../internal/core/cql/CqlRequestHandler.java | 12 ++--- .../DefaultLoadBalancingPolicy.java | 8 ++-- .../tracker/MultiplexingRequestTracker.java | 24 +++++----- .../core/tracker/NoopRequestTracker.java | 16 +++---- .../internal/core/tracker/RequestLogger.java | 16 +++---- .../ContinuousCqlRequestHandlerTest.java | 12 ++--- .../core/graph/GraphRequestHandlerTest.java | 8 ++-- .../cql/CqlRequestHandlerTrackerTest.java | 12 ++--- ...LoadBalancingPolicyRequestTrackerTest.java | 16 +++---- .../MultiplexingRequestTrackerTest.java | 32 +++++++------- .../tracker/RequestNodeLoggerExample.java | 8 ++-- 14 files changed, 115 insertions(+), 115 deletions(-) diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index 3e9677dc649..1706a5b295f 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -1457,8 +1457,8 @@ private void trackNodeError( latencyNanos, executionProfile, node, - logPrefix, - createExecutionInfo(frame)); + createExecutionInfo(frame), + logPrefix); } } @@ -1584,12 +1584,12 @@ private void completeResultSetFuture( context .getRequestTracker() .onNodeSuccess( - statement, nodeLatencyNanos, executionProfile, node, logPrefix, executionInfo); + statement, nodeLatencyNanos, executionProfile, node, executionInfo, logPrefix); } context .getRequestTracker() .onSuccess( - statement, totalLatencyNanos, executionProfile, node, logPrefix, executionInfo); + statement, totalLatencyNanos, executionProfile, node, executionInfo, logPrefix); } } else { Throwable error = (Throwable) pageOrError; @@ -1597,7 +1597,7 @@ private void completeResultSetFuture( context .getRequestTracker() .onError( - statement, error, totalLatencyNanos, executionProfile, node, logPrefix, null); + statement, error, totalLatencyNanos, executionProfile, node, null, logPrefix); if (error instanceof DriverTimeoutException) { throttler.signalTimeout(ContinuousRequestHandlerBase.this); session diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java index 7562606223a..dd5ef27a0b6 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java @@ -364,15 +364,15 @@ private void setFinalResult( nodeLatencyNanos, executionProfile, callback.node, - logPrefix, - executionInfo); + executionInfo, + logPrefix); requestTracker.onSuccess( callback.statement, totalLatencyNanos, executionProfile, callback.node, - logPrefix, - executionInfo); + executionInfo, + logPrefix); } if (sessionMetricUpdater.isEnabled( DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) { @@ -477,7 +477,7 @@ private void setFinalError( if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; requestTracker.onError( - statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo); + statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); @@ -870,7 +870,7 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; requestTracker.onNodeError( - statement, error, latencyNanos, executionProfile, node, logPrefix, null); + statement, error, latencyNanos, executionProfile, node, null, logPrefix); } @Override diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java index 68ada71a459..a3988f360f4 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java @@ -38,7 +38,7 @@ public interface RequestTracker extends AutoCloseable { /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} instead. + * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. */ @Deprecated default void onSuccess( @@ -49,7 +49,7 @@ default void onSuccess( /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} instead. + * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. */ @Deprecated default void onSuccess( @@ -69,23 +69,23 @@ default void onSuccess( * GenericType) session.execute} call until the result is made available to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. - * @param requestLogPrefix the dedicated log prefix for this request * @param executionInfo the execution info containing the results of this request + * @param requestLogPrefix the dedicated log prefix for this request */ default void onSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { // If client doesn't override onSuccess with executionInfo delegate call to the old method onSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onError(Request, Throwable, long, DriverExecutionProfile, Node, String, ExecutionInfo)} + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, String)} * instead. */ @Deprecated @@ -98,7 +98,7 @@ default void onError( /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onError(Request, Throwable, long, DriverExecutionProfile, Node, String, ExecutionInfo)} + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, String)} * instead. */ @Deprecated @@ -120,9 +120,9 @@ default void onError( * GenericType) session.execute} call until the error is propagated to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the error response, or {@code null} if the error occurred - * @param requestLogPrefix the dedicated log prefix for this request * @param executionInfo the execution info being returned to the client for this request if * available + * @param requestLogPrefix the dedicated log prefix for this request */ default void onError( @NonNull Request request, @@ -130,16 +130,16 @@ default void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, - @NonNull String requestLogPrefix, - @Nullable ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { // delegate call to the old method onError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String, - * ExecutionInfo)} instead. + * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, + * String)} instead. */ @Deprecated default void onNodeError( @@ -151,8 +151,8 @@ default void onNodeError( /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String, - * ExecutionInfo)} instead. + * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, + * String)} instead. */ @Deprecated default void onNodeError( @@ -174,8 +174,8 @@ default void onNodeError( * GenericType) session.execute} call until the error is propagated to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the error response. - * @param requestLogPrefix the dedicated log prefix for this request * @param executionInfo the execution info containing the results of this request if available + * @param requestLogPrefix the dedicated log prefix for this request */ default void onNodeError( @NonNull Request request, @@ -183,15 +183,15 @@ default void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix, - @Nullable ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method onNodeError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} + * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} * instead. */ @Deprecated @@ -203,7 +203,7 @@ default void onNodeSuccess( /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String, ExecutionInfo)} + * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} * instead. */ @Deprecated @@ -226,16 +226,16 @@ default void onNodeSuccess( * GenericType) session.execute} call until the result is made available to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. - * @param requestLogPrefix the dedicated log prefix for this request * @param executionInfo the execution info containing the results of this request + * @param requestLogPrefix the dedicated log prefix for this request */ default void onNodeSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String requestLogPrefix) { // delegate call to the old method onNodeSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index c8b805baa29..e1e749d33a0 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -340,15 +340,15 @@ private void setFinalResult( nodeLatencyNanos, callback.executionProfile, callback.node, - logPrefix, - executionInfo); + executionInfo, + logPrefix); requestTracker.onSuccess( callback.statement, totalLatencyNanos, callback.executionProfile, callback.node, - logPrefix, - executionInfo); + executionInfo, + logPrefix); } if (sessionMetricUpdater.isEnabled( DefaultSessionMetric.CQL_REQUESTS, callback.executionProfile.getName())) { @@ -456,7 +456,7 @@ private void setFinalError(Statement statement, Throwable error, Node node, i if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; requestTracker.onError( - statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo); + statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); @@ -964,7 +964,7 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan context, executionProfile); requestTracker.onNodeError( - statement, error, latencyNanos, executionProfile, node, logPrefix, executionInfo); + statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); } @Override diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 177e97daacb..42b3358dfa6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -241,8 +241,8 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String logPrefix) { updateResponseTimes(node); } @@ -253,8 +253,8 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @Nullable ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo, + @NonNull String logPrefix) { updateResponseTimes(node); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java index 2fbaa859206..3173a56b1df 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java @@ -83,12 +83,12 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String logPrefix) { invokeTrackers( tracker -> tracker.onSuccess( - request, latencyNanos, executionProfile, node, logPrefix, executionInfo), + request, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onSuccess"); } @@ -100,12 +100,12 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, - @NonNull String logPrefix, - @Nullable ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo, + @NonNull String logPrefix) { invokeTrackers( tracker -> tracker.onError( - request, error, latencyNanos, executionProfile, node, logPrefix, executionInfo), + request, error, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onError"); } @@ -116,12 +116,12 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String logPrefix) { invokeTrackers( tracker -> tracker.onNodeSuccess( - request, latencyNanos, executionProfile, node, logPrefix, executionInfo), + request, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onNodeSuccess"); } @@ -133,12 +133,12 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - ExecutionInfo executionInfo) { + ExecutionInfo executionInfo, + @NonNull String logPrefix) { invokeTrackers( tracker -> tracker.onNodeError( - request, error, latencyNanos, executionProfile, node, logPrefix, executionInfo), + request, error, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onNodeError"); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java index 3bb5f2e1bca..921a1135c03 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java @@ -44,8 +44,8 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String requestPrefix) { // nothing to do } @@ -56,8 +56,8 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, - @NonNull String requestPrefix, - @Nullable ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo, + @NonNull String requestPrefix) { // nothing to do } @@ -68,8 +68,8 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix, - @Nullable ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo, + @NonNull String requestPrefix) { // nothing to do } @@ -79,8 +79,8 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String requestPrefix) { // nothing to do } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java index 1a5832d69bd..fa51e281071 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java @@ -88,8 +88,8 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String logPrefix) { boolean successEnabled = executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED, false); @@ -142,8 +142,8 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, - @NonNull String logPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String logPrefix) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED, false)) { return; @@ -187,8 +187,8 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @Nullable ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo, + @NonNull String logPrefix) { // Nothing to do } @@ -198,8 +198,8 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String logPrefix) { // Nothing to do } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java index e24e146fd5f..29709e0b11d 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java @@ -499,24 +499,24 @@ public void should_invoke_request_tracker(DseProtocolVersion version) { anyLong(), any(DriverExecutionProfile.class), eq(node1), - matches(LOG_PREFIX_PER_REQUEST), - nullable(ExecutionInfo.class)); + nullable(ExecutionInfo.class), + matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onNodeSuccess( eq(UNDEFINED_IDEMPOTENCE_STATEMENT), anyLong(), any(DriverExecutionProfile.class), eq(node2), - matches(LOG_PREFIX_PER_REQUEST), - any(ExecutionInfo.class)); + any(ExecutionInfo.class), + matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onSuccess( eq(UNDEFINED_IDEMPOTENCE_STATEMENT), anyLong(), any(DriverExecutionProfile.class), eq(node2), - matches(LOG_PREFIX_PER_REQUEST), - any(ExecutionInfo.class)); + any(ExecutionInfo.class), + matches(LOG_PREFIX_PER_REQUEST)); verifyNoMoreInteractions(requestTracker); }); } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java index 272df50bf65..09713b4ac6e 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java @@ -522,16 +522,16 @@ public void should_invoke_request_tracker_and_update_metrics( anyLong(), any(DriverExecutionProfile.class), eq(node), - matches(LOG_PREFIX_PER_REQUEST), - any(ExecutionInfo.class)); + any(ExecutionInfo.class), + matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onNodeSuccess( eq(graphStatement), anyLong(), any(DriverExecutionProfile.class), eq(node), - matches(LOG_PREFIX_PER_REQUEST), - any(ExecutionInfo.class)); + any(ExecutionInfo.class), + matches(LOG_PREFIX_PER_REQUEST)); verifyNoMoreInteractions(requestTracker); verify(nodeMetricUpdater1) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java index c91846ac61b..c5a9d6d4cb2 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java @@ -73,24 +73,24 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node1), - any(String.class), - nullable(ExecutionInfo.class)); + nullable(ExecutionInfo.class), + any(String.class)); verify(requestTracker) .onNodeSuccess( eq(UNDEFINED_IDEMPOTENCE_STATEMENT), anyLong(), any(DriverExecutionProfile.class), eq(node2), - any(String.class), - any(ExecutionInfo.class)); + any(ExecutionInfo.class), + any(String.class)); verify(requestTracker) .onSuccess( eq(UNDEFINED_IDEMPOTENCE_STATEMENT), anyLong(), any(DriverExecutionProfile.class), eq(node2), - any(String.class), - any(ExecutionInfo.class)); + any(ExecutionInfo.class), + any(String.class)); verifyNoMoreInteractions(requestTracker); }); } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java index a278988e89b..f2009b4fb9b 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java @@ -67,7 +67,7 @@ public void should_record_first_response_time_on_node_success() { nextNanoTime = 123; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix, executionInfo); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -85,7 +85,7 @@ public void should_record_second_response_time_on_node_success() { nextNanoTime = 456; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix, executionInfo); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -109,8 +109,8 @@ public void should_record_further_response_times_on_node_success() { nextNanoTime = 789; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix, executionInfo); - policy.onNodeSuccess(request, 0, profile, node2, logPrefix, executionInfo); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); + policy.onNodeSuccess(request, 0, profile, node2, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -135,7 +135,7 @@ public void should_record_first_response_time_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix, executionInfo); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -154,7 +154,7 @@ public void should_record_second_response_time_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix, executionInfo); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -179,8 +179,8 @@ public void should_record_further_response_times_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix, executionInfo); - policy.onNodeError(request, iae, 0, profile, node2, logPrefix, executionInfo); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); + policy.onNodeError(request, iae, 0, profile, node2, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java index 648b05ec07e..adbad606712 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java @@ -113,12 +113,12 @@ public void should_notify_onSuccess() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onSuccess(request, 123456L, profile, node, "test", executionInfo); + .onSuccess(request, 123456L, profile, node, executionInfo, "test"); // when - tracker.onSuccess(request, 123456L, profile, node, "test", executionInfo); + tracker.onSuccess(request, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onSuccess(request, 123456L, profile, node, "test", executionInfo); - verify(child2).onSuccess(request, 123456L, profile, node, "test", executionInfo); + verify(child1).onSuccess(request, 123456L, profile, node, executionInfo, "test"); + verify(child2).onSuccess(request, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -131,12 +131,12 @@ public void should_notify_onError() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onError(request, error, 123456L, profile, node, "test", executionInfo); + .onError(request, error, 123456L, profile, node, executionInfo, "test"); // when - tracker.onError(request, error, 123456L, profile, node, "test", executionInfo); + tracker.onError(request, error, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onError(request, error, 123456L, profile, node, "test", executionInfo); - verify(child2).onError(request, error, 123456L, profile, node, "test", executionInfo); + verify(child1).onError(request, error, 123456L, profile, node, executionInfo, "test"); + verify(child2).onError(request, error, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -149,12 +149,12 @@ public void should_notify_onNodeSuccess() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onNodeSuccess(request, 123456L, profile, node, "test", executionInfo); + .onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); // when - tracker.onNodeSuccess(request, 123456L, profile, node, "test", executionInfo); + tracker.onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onNodeSuccess(request, 123456L, profile, node, "test", executionInfo); - verify(child2).onNodeSuccess(request, 123456L, profile, node, "test", executionInfo); + verify(child1).onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); + verify(child2).onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -167,12 +167,12 @@ public void should_notify_onNodeError() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onNodeError(request, error, 123456L, profile, node, "test", executionInfo); + .onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); // when - tracker.onNodeError(request, error, 123456L, profile, node, "test", executionInfo); + tracker.onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onNodeError(request, error, 123456L, profile, node, "test", executionInfo); - verify(child2).onNodeError(request, error, 123456L, profile, node, "test", executionInfo); + verify(child1).onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); + verify(child2).onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java index 0a3407c8611..f22475b5aca 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java @@ -41,8 +41,8 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @Nullable ExecutionInfo executionInfo) { + @Nullable ExecutionInfo executionInfo, + @NonNull String logPrefix) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED)) { return; } @@ -78,8 +78,8 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix, - @NonNull ExecutionInfo executionInfo) { + @NonNull ExecutionInfo executionInfo, + @NonNull String logPrefix) { boolean successEnabled = executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED); boolean slowEnabled =