diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java index 44df3b3a03d..1706a5b295f 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java @@ -19,11 +19,13 @@ import com.datastax.dse.driver.api.core.DseProtocolVersion; import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet; +import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet; import com.datastax.dse.driver.internal.core.DseProtocolFeature; import com.datastax.dse.driver.internal.core.cql.DseConversions; import com.datastax.dse.protocol.internal.request.Revise; import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata; import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.AsyncPagingIterable; import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.DriverTimeoutException; import com.datastax.oss.driver.api.core.NodeUnavailableException; @@ -626,7 +628,7 @@ public void operationComplete(@NonNull Future future) { Throwable error = future.cause(); if (error instanceof EncoderException && error.getCause() instanceof FrameTooLongException) { - trackNodeError(node, error.getCause()); + trackNodeError(node, error.getCause(), null); lock.lock(); try { abort(error.getCause(), false); @@ -643,7 +645,7 @@ public void operationComplete(@NonNull Future future) { .getMetricUpdater() .incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName()); recordError(node, error); - trackNodeError(node, error.getCause()); + trackNodeError(node, error.getCause(), null); sendRequest(statement, null, executionIndex, retryCount, scheduleSpeculativeExecution); } } else { @@ -737,7 +739,8 @@ private void onPageTimeout(int expectedPage) { * Invoked when a continuous paging response is received, either a successful or failed one. * *

Delegates further processing to appropriate methods: {@link #processResultResponse(Result, - * Frame)} if the response was successful, or {@link #processErrorResponse(Error)} if it wasn't. + * Frame)} if the response was successful, or {@link #processErrorResponse(Error, Frame)} if it + * wasn't. * * @param response the received {@link Frame}. */ @@ -758,15 +761,15 @@ public void onResponse(@NonNull Frame response) { processResultResponse((Result) responseMessage, response); } else if (responseMessage instanceof Error) { LOG.trace("[{}] Got error response", logPrefix); - processErrorResponse((Error) responseMessage); + processErrorResponse((Error) responseMessage, response); } else { IllegalStateException error = new IllegalStateException("Unexpected response " + responseMessage); - trackNodeError(node, error); + trackNodeError(node, error, response); abort(error, false); } } catch (Throwable t) { - trackNodeError(node, t); + trackNodeError(node, t, response); abort(t, false); } } finally { @@ -900,7 +903,7 @@ private void processResultResponse(@NonNull Result result, @Nullable Frame frame * @param errorMessage the error message received. */ @SuppressWarnings("GuardedBy") // this method is only called with the lock held - private void processErrorResponse(@NonNull Error errorMessage) { + private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame frame) { assert lock.isHeldByCurrentThread(); if (errorMessage instanceof Unprepared) { processUnprepared((Unprepared) errorMessage); @@ -909,7 +912,7 @@ private void processErrorResponse(@NonNull Error errorMessage) { if (error instanceof BootstrappingException) { LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node); recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, frame); sendRequest(statement, null, executionIndex, retryCount, false); } else if (error instanceof QueryValidationException || error instanceof FunctionFailureException @@ -921,7 +924,7 @@ private void processErrorResponse(@NonNull Error errorMessage) { NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater(); metricUpdater.incrementCounter( DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName()); - trackNodeError(node, error); + trackNodeError(node, error, frame); abort(error, true); } else { try { @@ -1060,7 +1063,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) { + "This usually happens when you run a 'USE...' query after " + "the statement was prepared.", Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId))); - trackNodeError(node, illegalStateException); + trackNodeError(node, illegalStateException, null); fatalError = illegalStateException; } else { LOG.trace( @@ -1079,18 +1082,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) { || prepareError instanceof FunctionFailureException || prepareError instanceof ProtocolError) { LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix); - trackNodeError(node, prepareError); + trackNodeError(node, prepareError, null); fatalError = prepareError; } } } else if (exception instanceof RequestThrottlingException) { - trackNodeError(node, exception); + trackNodeError(node, exception, null); fatalError = exception; } if (fatalError == null) { LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix); recordError(node, exception); - trackNodeError(node, exception); + trackNodeError(node, exception, null); sendRequest(statement, null, executionIndex, retryCount, false); } } @@ -1118,18 +1121,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab switch (verdict.getRetryDecision()) { case RETRY_SAME: recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, null); sendRequest( verdict.getRetryRequest(statement), node, executionIndex, retryCount + 1, false); break; case RETRY_NEXT: recordError(node, error); - trackNodeError(node, error); + trackNodeError(node, error, null); sendRequest( verdict.getRetryRequest(statement), null, executionIndex, retryCount + 1, false); break; case RETHROW: - trackNodeError(node, error); + trackNodeError(node, error, null); abort(error, true); break; case IGNORE: @@ -1442,12 +1445,20 @@ private void reenableAutoReadIfNeeded() { // ERROR HANDLING - private void trackNodeError(@NonNull Node node, @NonNull Throwable error) { + private void trackNodeError( + @NonNull Node node, @NonNull Throwable error, @Nullable Frame frame) { if (nodeErrorReported.compareAndSet(false, true)) { long latencyNanos = System.nanoTime() - this.messageStartTimeNanos; context .getRequestTracker() - .onNodeError(this.statement, error, latencyNanos, executionProfile, node, logPrefix); + .onNodeError( + this.statement, + error, + latencyNanos, + executionProfile, + node, + createExecutionInfo(frame), + logPrefix); } } @@ -1561,21 +1572,32 @@ private void completeResultSetFuture( if (resultSetClass.isInstance(pageOrError)) { if (future.complete(resultSetClass.cast(pageOrError))) { throttler.signalSuccess(ContinuousRequestHandlerBase.this); + + ExecutionInfo executionInfo = null; + if (pageOrError instanceof AsyncPagingIterable) { + executionInfo = ((AsyncPagingIterable) pageOrError).getExecutionInfo(); + } else if (pageOrError instanceof AsyncGraphResultSet) { + executionInfo = ((AsyncGraphResultSet) pageOrError).getRequestExecutionInfo(); + } + if (nodeSuccessReported.compareAndSet(false, true)) { context .getRequestTracker() - .onNodeSuccess(statement, nodeLatencyNanos, executionProfile, node, logPrefix); + .onNodeSuccess( + statement, nodeLatencyNanos, executionProfile, node, executionInfo, logPrefix); } context .getRequestTracker() - .onSuccess(statement, totalLatencyNanos, executionProfile, node, logPrefix); + .onSuccess( + statement, totalLatencyNanos, executionProfile, node, executionInfo, logPrefix); } } else { Throwable error = (Throwable) pageOrError; if (future.completeExceptionally(error)) { context .getRequestTracker() - .onError(statement, error, totalLatencyNanos, executionProfile, node, logPrefix); + .onError( + statement, error, totalLatencyNanos, executionProfile, node, null, logPrefix); if (error instanceof DriverTimeoutException) { throttler.signalTimeout(ContinuousRequestHandlerBase.this); session @@ -1606,6 +1628,22 @@ private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Fram executionProfile); } + @NonNull + private ExecutionInfo createExecutionInfo(@Nullable Frame response) { + return new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + executionIndex, + errors, + null, + response, + true, + session, + context, + executionProfile); + } + private void logTimeoutSchedulingError(IllegalStateException timeoutError) { // If we're racing with session shutdown, the timer might be stopped already. We don't want // to schedule more executions anyway, so swallow the error. diff --git a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java index c2298458805..dd5ef27a0b6 100644 --- a/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java +++ b/core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java @@ -360,9 +360,19 @@ private void setFinalResult( totalLatencyNanos = completionTimeNanos - startTimeNanos; long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos; requestTracker.onNodeSuccess( - callback.statement, nodeLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + nodeLatencyNanos, + executionProfile, + callback.node, + executionInfo, + logPrefix); requestTracker.onSuccess( - callback.statement, totalLatencyNanos, executionProfile, callback.node, logPrefix); + callback.statement, + totalLatencyNanos, + executionProfile, + callback.node, + executionInfo, + logPrefix); } if (sessionMetricUpdater.isEnabled( DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) { @@ -446,27 +456,28 @@ private void setFinalError( GraphStatement statement, Throwable error, Node node, int execution) { DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(statement, context); + ExecutionInfo executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + null, + null, + true, + session, + context, + executionProfile); if (error instanceof DriverException) { - ((DriverException) error) - .setExecutionInfo( - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile)); + ((DriverException) error).setExecutionInfo(executionInfo); } if (result.completeExceptionally(error)) { cancelScheduledTasks(); if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; - requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onError( + statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); @@ -858,7 +869,8 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; - requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onNodeError( + statement, error, latencyNanos, executionProfile, node, null, logPrefix); } @Override diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java index d29ee48d352..a3988f360f4 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.api.core.tracker; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -37,7 +38,7 @@ public interface RequestTracker extends AutoCloseable { /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onSuccess(Request, long, DriverExecutionProfile, Node, String)} instead. + * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. */ @Deprecated default void onSuccess( @@ -46,6 +47,21 @@ default void onSuccess( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node) {} + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} instead. + */ + @Deprecated + default void onSuccess( + @NonNull Request request, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String requestLogPrefix) { + // If client doesn't override onSuccess with requestLogPrefix delegate call to the old method + onSuccess(request, latencyNanos, executionProfile, node); + } + /** * Invoked each time a request succeeds. * @@ -53,6 +69,7 @@ default void onSuccess( * GenericType) session.execute} call until the result is made available to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. + * @param executionInfo the execution info containing the results of this request * @param requestLogPrefix the dedicated log prefix for this request */ default void onSuccess( @@ -60,14 +77,16 @@ default void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { - // If client doesn't override onSuccess with requestLogPrefix delegate call to the old method - onSuccess(request, latencyNanos, executionProfile, node); + // If client doesn't override onSuccess with executionInfo delegate call to the old method + onSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onError(Request, Throwable, long, DriverExecutionProfile, Node, String)} instead. + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. */ @Deprecated default void onError( @@ -77,6 +96,23 @@ default void onError( @NonNull DriverExecutionProfile executionProfile, @Nullable Node node) {} + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. + */ + @Deprecated + default void onError( + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @Nullable Node node, + @NonNull String requestLogPrefix) { + // If client doesn't override onError with requestLogPrefix delegate call to the old method + onError(request, error, latencyNanos, executionProfile, node); + } + /** * Invoked each time a request fails. * @@ -84,6 +120,8 @@ default void onError( * GenericType) session.execute} call until the error is propagated to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the error response, or {@code null} if the error occurred + * @param executionInfo the execution info being returned to the client for this request if + * available * @param requestLogPrefix the dedicated log prefix for this request */ default void onError( @@ -92,14 +130,16 @@ default void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { - // If client doesn't override onError with requestLogPrefix delegate call to the old method - onError(request, error, latencyNanos, executionProfile, node); + // delegate call to the old method + onError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, String)} instead. + * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, + * String)} instead. */ @Deprecated default void onNodeError( @@ -109,6 +149,23 @@ default void onNodeError( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node) {} + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onNodeError(Request, Throwable, long, DriverExecutionProfile, Node, ExecutionInfo, + * String)} instead. + */ + @Deprecated + default void onNodeError( + @NonNull Request request, + @NonNull Throwable error, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String requestLogPrefix) { + // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method + onNodeError(request, error, latencyNanos, executionProfile, node); + } + /** * Invoked each time a request fails at the node level. Similar to {@link #onError(Request, * Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level. @@ -117,6 +174,7 @@ default void onNodeError( * GenericType) session.execute} call until the error is propagated to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the error response. + * @param executionInfo the execution info containing the results of this request if available * @param requestLogPrefix the dedicated log prefix for this request */ default void onNodeError( @@ -125,14 +183,16 @@ default void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method - onNodeError(request, error, latencyNanos, executionProfile, node); + onNodeError(request, error, latencyNanos, executionProfile, node, requestLogPrefix); } /** * @deprecated This method only exists for backward compatibility. Override {@link - * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, String)} instead. + * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. */ @Deprecated default void onNodeSuccess( @@ -141,6 +201,23 @@ default void onNodeSuccess( @NonNull DriverExecutionProfile executionProfile, @NonNull Node node) {} + /** + * @deprecated This method only exists for backward compatibility. Override {@link + * #onNodeSuccess(Request, long, DriverExecutionProfile, Node, ExecutionInfo, String)} + * instead. + */ + @Deprecated + default void onNodeSuccess( + @NonNull Request request, + long latencyNanos, + @NonNull DriverExecutionProfile executionProfile, + @NonNull Node node, + @NonNull String requestLogPrefix) { + // If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old + // method + onNodeSuccess(request, latencyNanos, executionProfile, node); + } + /** * Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request, * long, DriverExecutionProfile, Node, String)} but at per node level. @@ -149,6 +226,7 @@ default void onNodeSuccess( * GenericType) session.execute} call until the result is made available to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. + * @param executionInfo the execution info containing the results of this request * @param requestLogPrefix the dedicated log prefix for this request */ default void onNodeSuccess( @@ -156,10 +234,10 @@ default void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String requestLogPrefix) { - // If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old - // method - onNodeSuccess(request, latencyNanos, executionProfile, node); + // delegate call to the old method + onNodeSuccess(request, latencyNanos, executionProfile, node, requestLogPrefix); } /** diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index e7e334d57d8..e1e749d33a0 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -340,12 +340,14 @@ private void setFinalResult( nodeLatencyNanos, callback.executionProfile, callback.node, + executionInfo, logPrefix); requestTracker.onSuccess( callback.statement, totalLatencyNanos, callback.executionProfile, callback.node, + executionInfo, logPrefix); } if (sessionMetricUpdater.isEnabled( @@ -433,27 +435,28 @@ public void onThrottleFailure(@NonNull RequestThrottlingException error) { private void setFinalError(Statement statement, Throwable error, Node node, int execution) { DriverExecutionProfile executionProfile = Conversions.resolveExecutionProfile(statement, context); + ExecutionInfo executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + null, + null, + true, + session, + context, + executionProfile); if (error instanceof DriverException) { - ((DriverException) error) - .setExecutionInfo( - new DefaultExecutionInfo( - statement, - node, - startedSpeculativeExecutionsCount.get(), - execution, - errors, - null, - null, - true, - session, - context, - executionProfile)); + ((DriverException) error).setExecutionInfo(executionInfo); } if (result.completeExceptionally(error)) { cancelScheduledTasks(); if (!(requestTracker instanceof NoopRequestTracker)) { long latencyNanos = System.nanoTime() - startTimeNanos; - requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix); + requestTracker.onError( + statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); } if (error instanceof DriverTimeoutException) { throttler.signalTimeout(this); @@ -947,7 +950,21 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan nodeResponseTimeNanos = System.nanoTime(); } long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos; - requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix); + ExecutionInfo executionInfo = + new DefaultExecutionInfo( + statement, + node, + startedSpeculativeExecutionsCount.get(), + execution, + errors, + null, + null, + true, + session, + context, + executionProfile); + requestTracker.onNodeError( + statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix); } @Override diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 47edcdfe53e..42b3358dfa6 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -240,6 +241,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { updateResponseTimes(node); } @@ -251,6 +253,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String logPrefix) { updateResponseTimes(node); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java index d4d20f3eb78..3173a56b1df 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java @@ -18,6 +18,7 @@ package com.datastax.oss.driver.internal.core.tracker; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -82,9 +83,12 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { invokeTrackers( - tracker -> tracker.onSuccess(request, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onSuccess( + request, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onSuccess"); } @@ -96,9 +100,12 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String logPrefix) { invokeTrackers( - tracker -> tracker.onError(request, error, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onError( + request, error, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onError"); } @@ -109,9 +116,12 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { invokeTrackers( - tracker -> tracker.onNodeSuccess(request, latencyNanos, executionProfile, node, logPrefix), + tracker -> + tracker.onNodeSuccess( + request, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onNodeSuccess"); } @@ -123,10 +133,12 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + ExecutionInfo executionInfo, @NonNull String logPrefix) { invokeTrackers( tracker -> - tracker.onNodeError(request, error, latencyNanos, executionProfile, node, logPrefix), + tracker.onNodeError( + request, error, latencyNanos, executionProfile, node, executionInfo, logPrefix), logPrefix, "onNodeError"); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java index 09ac27e5e75..921a1135c03 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java @@ -19,10 +19,12 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import net.jcip.annotations.ThreadSafe; /** @@ -42,6 +44,7 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String requestPrefix) { // nothing to do } @@ -53,6 +56,7 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String requestPrefix) { // nothing to do } @@ -64,6 +68,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String requestPrefix) { // nothing to do } @@ -74,6 +79,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String requestPrefix) { // nothing to do } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java index 235ef051b40..fa51e281071 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java @@ -20,11 +20,13 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.SessionBuilder; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; import java.time.Duration; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; @@ -86,6 +88,7 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { boolean successEnabled = @@ -139,6 +142,7 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED, false)) { @@ -183,6 +187,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String logPrefix) { // Nothing to do } @@ -193,6 +198,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { // Nothing to do } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java index a816183e9ee..29709e0b11d 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousCqlRequestHandlerTest.java @@ -31,6 +31,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.matches; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -498,6 +499,7 @@ public void should_invoke_request_tracker(DseProtocolVersion version) { anyLong(), any(DriverExecutionProfile.class), eq(node1), + nullable(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onNodeSuccess( @@ -505,6 +507,7 @@ public void should_invoke_request_tracker(DseProtocolVersion version) { anyLong(), any(DriverExecutionProfile.class), eq(node2), + any(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onSuccess( @@ -512,6 +515,7 @@ public void should_invoke_request_tracker(DseProtocolVersion version) { anyLong(), any(DriverExecutionProfile.class), eq(node2), + any(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verifyNoMoreInteractions(requestTracker); }); diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java index 9f325003610..09713b4ac6e 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandlerTest.java @@ -56,6 +56,7 @@ import com.datastax.oss.driver.api.core.Version; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.internal.core.cql.Conversions; @@ -521,6 +522,7 @@ public void should_invoke_request_tracker_and_update_metrics( anyLong(), any(DriverExecutionProfile.class), eq(node), + any(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verify(requestTracker) .onNodeSuccess( @@ -528,6 +530,7 @@ public void should_invoke_request_tracker_and_update_metrics( anyLong(), any(DriverExecutionProfile.class), eq(node), + any(ExecutionInfo.class), matches(LOG_PREFIX_PER_REQUEST)); verifyNoMoreInteractions(requestTracker); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java index ecc087fb8ac..c5a9d6d4cb2 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandlerTrackerTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -29,6 +30,7 @@ import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.servererrors.BootstrappingException; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker; @@ -38,7 +40,6 @@ import org.junit.Test; public class CqlRequestHandlerTrackerTest extends CqlRequestHandlerTestBase { - @Test public void should_invoke_request_tracker() { try (RequestHandlerTestHarness harness = @@ -72,6 +73,7 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node1), + nullable(ExecutionInfo.class), any(String.class)); verify(requestTracker) .onNodeSuccess( @@ -79,6 +81,7 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node2), + any(ExecutionInfo.class), any(String.class)); verify(requestTracker) .onSuccess( @@ -86,6 +89,7 @@ public void should_invoke_request_tracker() { anyLong(), any(DriverExecutionProfile.class), eq(node2), + any(ExecutionInfo.class), any(String.class)); verifyNoMoreInteractions(requestTracker); }); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java index bcc6439a2a5..f2009b4fb9b 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyRequestTrackerTest.java @@ -22,6 +22,7 @@ import static org.mockito.BDDMockito.given; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; @@ -34,6 +35,7 @@ public class DefaultLoadBalancingPolicyRequestTrackerTest extends LoadBalancingP @Mock Request request; @Mock DriverExecutionProfile profile; + @Mock ExecutionInfo executionInfo; final String logPrefix = "lbp-test-log-prefix"; private DefaultLoadBalancingPolicy policy; @@ -65,7 +67,7 @@ public void should_record_first_response_time_on_node_success() { nextNanoTime = 123; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -83,7 +85,7 @@ public void should_record_second_response_time_on_node_success() { nextNanoTime = 456; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -107,8 +109,8 @@ public void should_record_further_response_times_on_node_success() { nextNanoTime = 789; // When - policy.onNodeSuccess(request, 0, profile, node1, logPrefix); - policy.onNodeSuccess(request, 0, profile, node2, logPrefix); + policy.onNodeSuccess(request, 0, profile, node1, executionInfo, logPrefix); + policy.onNodeSuccess(request, 0, profile, node2, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -133,7 +135,7 @@ public void should_record_first_response_time_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -152,7 +154,7 @@ public void should_record_second_response_time_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) @@ -177,8 +179,8 @@ public void should_record_further_response_times_on_node_error() { Throwable iae = new IllegalArgumentException(); // When - policy.onNodeError(request, iae, 0, profile, node1, logPrefix); - policy.onNodeError(request, iae, 0, profile, node2, logPrefix); + policy.onNodeError(request, iae, 0, profile, node1, executionInfo, logPrefix); + policy.onNodeError(request, iae, 0, profile, node2, executionInfo, logPrefix); // Then assertThat(policy.responseTimes) diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java index 8dcad99b459..adbad606712 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTrackerTest.java @@ -28,6 +28,7 @@ import ch.qos.logback.core.Appender; import com.datastax.oss.driver.api.core.DriverExecutionException; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.api.core.session.Session; @@ -51,6 +52,7 @@ public class MultiplexingRequestTrackerTest { @Mock private DriverExecutionProfile profile; @Mock private Node node; @Mock private Session session; + @Mock private ExecutionInfo executionInfo; @Mock private Appender appender; @Captor private ArgumentCaptor loggingEventCaptor; @@ -111,12 +113,12 @@ public void should_notify_onSuccess() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onSuccess(request, 123456L, profile, node, "test"); + .onSuccess(request, 123456L, profile, node, executionInfo, "test"); // when - tracker.onSuccess(request, 123456L, profile, node, "test"); + tracker.onSuccess(request, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onSuccess(request, 123456L, profile, node, "test"); - verify(child2).onSuccess(request, 123456L, profile, node, "test"); + verify(child1).onSuccess(request, 123456L, profile, node, executionInfo, "test"); + verify(child2).onSuccess(request, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -129,12 +131,12 @@ public void should_notify_onError() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onError(request, error, 123456L, profile, node, "test"); + .onError(request, error, 123456L, profile, node, executionInfo, "test"); // when - tracker.onError(request, error, 123456L, profile, node, "test"); + tracker.onError(request, error, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onError(request, error, 123456L, profile, node, "test"); - verify(child2).onError(request, error, 123456L, profile, node, "test"); + verify(child1).onError(request, error, 123456L, profile, node, executionInfo, "test"); + verify(child2).onError(request, error, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -147,12 +149,12 @@ public void should_notify_onNodeSuccess() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onNodeSuccess(request, 123456L, profile, node, "test"); + .onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); // when - tracker.onNodeSuccess(request, 123456L, profile, node, "test"); + tracker.onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onNodeSuccess(request, 123456L, profile, node, "test"); - verify(child2).onNodeSuccess(request, 123456L, profile, node, "test"); + verify(child1).onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); + verify(child2).onNodeSuccess(request, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( @@ -165,12 +167,12 @@ public void should_notify_onNodeError() { MultiplexingRequestTracker tracker = new MultiplexingRequestTracker(child1, child2); willThrow(new NullPointerException()) .given(child1) - .onNodeError(request, error, 123456L, profile, node, "test"); + .onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); // when - tracker.onNodeError(request, error, 123456L, profile, node, "test"); + tracker.onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); // then - verify(child1).onNodeError(request, error, 123456L, profile, node, "test"); - verify(child2).onNodeError(request, error, 123456L, profile, node, "test"); + verify(child1).onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); + verify(child2).onNodeError(request, error, 123456L, profile, node, executionInfo, "test"); verify(appender).doAppend(loggingEventCaptor.capture()); assertThat(loggingEventCaptor.getAllValues().stream().map(ILoggingEvent::getFormattedMessage)) .contains( diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java index eae98339637..f22475b5aca 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java @@ -20,11 +20,13 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.cql.ExecutionInfo; import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.core.session.Request; import com.datastax.oss.driver.internal.core.tracker.RequestLogFormatter; import com.datastax.oss.driver.internal.core.tracker.RequestLogger; import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; public class RequestNodeLoggerExample extends RequestLogger { @@ -39,6 +41,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @Nullable ExecutionInfo executionInfo, @NonNull String logPrefix) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED)) { return; @@ -75,6 +78,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, + @NonNull ExecutionInfo executionInfo, @NonNull String logPrefix) { boolean successEnabled = executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED);