Skip to content

Commit 43db6f6

Browse files
Andrew Weaverlukasz-antoniak
Andrew Weaver
authored andcommitted
Add ExecutionInfo to RequestTracker methods
1 parent a17f7be commit 43db6f6

File tree

14 files changed

+299
-98
lines changed

14 files changed

+299
-98
lines changed

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import com.datastax.dse.driver.api.core.DseProtocolVersion;
2121
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousAsyncResultSet;
22+
import com.datastax.dse.driver.api.core.graph.AsyncGraphResultSet;
2223
import com.datastax.dse.driver.internal.core.DseProtocolFeature;
2324
import com.datastax.dse.driver.internal.core.cql.DseConversions;
2425
import com.datastax.dse.protocol.internal.request.Revise;
2526
import com.datastax.dse.protocol.internal.response.result.DseRowsMetadata;
2627
import com.datastax.oss.driver.api.core.AllNodesFailedException;
28+
import com.datastax.oss.driver.api.core.AsyncPagingIterable;
2729
import com.datastax.oss.driver.api.core.CqlIdentifier;
2830
import com.datastax.oss.driver.api.core.DriverTimeoutException;
2931
import com.datastax.oss.driver.api.core.NodeUnavailableException;
@@ -626,7 +628,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
626628
Throwable error = future.cause();
627629
if (error instanceof EncoderException
628630
&& error.getCause() instanceof FrameTooLongException) {
629-
trackNodeError(node, error.getCause());
631+
trackNodeError(node, error.getCause(), null);
630632
lock.lock();
631633
try {
632634
abort(error.getCause(), false);
@@ -643,7 +645,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
643645
.getMetricUpdater()
644646
.incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName());
645647
recordError(node, error);
646-
trackNodeError(node, error.getCause());
648+
trackNodeError(node, error.getCause(), null);
647649
sendRequest(statement, null, executionIndex, retryCount, scheduleSpeculativeExecution);
648650
}
649651
} else {
@@ -738,7 +740,8 @@ private void onPageTimeout(int expectedPage) {
738740
* Invoked when a continuous paging response is received, either a successful or failed one.
739741
*
740742
* <p>Delegates further processing to appropriate methods: {@link #processResultResponse(Result,
741-
* Frame)} if the response was successful, or {@link #processErrorResponse(Error)} if it wasn't.
743+
* Frame)} if the response was successful, or {@link #processErrorResponse(Error, Frame)} if it
744+
* wasn't.
742745
*
743746
* @param response the received {@link Frame}.
744747
*/
@@ -759,15 +762,15 @@ public void onResponse(@NonNull Frame response) {
759762
processResultResponse((Result) responseMessage, response);
760763
} else if (responseMessage instanceof Error) {
761764
LOG.trace("[{}] Got error response", logPrefix);
762-
processErrorResponse((Error) responseMessage);
765+
processErrorResponse((Error) responseMessage, response);
763766
} else {
764767
IllegalStateException error =
765768
new IllegalStateException("Unexpected response " + responseMessage);
766-
trackNodeError(node, error);
769+
trackNodeError(node, error, response);
767770
abort(error, false);
768771
}
769772
} catch (Throwable t) {
770-
trackNodeError(node, t);
773+
trackNodeError(node, t, response);
771774
abort(t, false);
772775
}
773776
} finally {
@@ -901,7 +904,7 @@ private void processResultResponse(@NonNull Result result, @Nullable Frame frame
901904
* @param errorMessage the error message received.
902905
*/
903906
@SuppressWarnings("GuardedBy") // this method is only called with the lock held
904-
private void processErrorResponse(@NonNull Error errorMessage) {
907+
private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame frame) {
905908
assert lock.isHeldByCurrentThread();
906909
if (errorMessage instanceof Unprepared) {
907910
processUnprepared((Unprepared) errorMessage);
@@ -910,7 +913,7 @@ private void processErrorResponse(@NonNull Error errorMessage) {
910913
if (error instanceof BootstrappingException) {
911914
LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node);
912915
recordError(node, error);
913-
trackNodeError(node, error);
916+
trackNodeError(node, error, frame);
914917
sendRequest(statement, null, executionIndex, retryCount, false);
915918
} else if (error instanceof QueryValidationException
916919
|| error instanceof FunctionFailureException
@@ -922,7 +925,7 @@ private void processErrorResponse(@NonNull Error errorMessage) {
922925
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
923926
metricUpdater.incrementCounter(
924927
DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName());
925-
trackNodeError(node, error);
928+
trackNodeError(node, error, frame);
926929
abort(error, true);
927930
} else {
928931
try {
@@ -1061,7 +1064,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10611064
+ "This usually happens when you run a 'USE...' query after "
10621065
+ "the statement was prepared.",
10631066
Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId)));
1064-
trackNodeError(node, illegalStateException);
1067+
trackNodeError(node, illegalStateException, null);
10651068
fatalError = illegalStateException;
10661069
} else {
10671070
LOG.trace(
@@ -1080,18 +1083,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10801083
|| prepareError instanceof FunctionFailureException
10811084
|| prepareError instanceof ProtocolError) {
10821085
LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix);
1083-
trackNodeError(node, prepareError);
1086+
trackNodeError(node, prepareError, null);
10841087
fatalError = prepareError;
10851088
}
10861089
}
10871090
} else if (exception instanceof RequestThrottlingException) {
1088-
trackNodeError(node, exception);
1091+
trackNodeError(node, exception, null);
10891092
fatalError = exception;
10901093
}
10911094
if (fatalError == null) {
10921095
LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix);
10931096
recordError(node, exception);
1094-
trackNodeError(node, exception);
1097+
trackNodeError(node, exception, null);
10951098
sendRequest(statement, null, executionIndex, retryCount, false);
10961099
}
10971100
}
@@ -1119,18 +1122,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab
11191122
switch (verdict.getRetryDecision()) {
11201123
case RETRY_SAME:
11211124
recordError(node, error);
1122-
trackNodeError(node, error);
1125+
trackNodeError(node, error, null);
11231126
sendRequest(
11241127
verdict.getRetryRequest(statement), node, executionIndex, retryCount + 1, false);
11251128
break;
11261129
case RETRY_NEXT:
11271130
recordError(node, error);
1128-
trackNodeError(node, error);
1131+
trackNodeError(node, error, null);
11291132
sendRequest(
11301133
verdict.getRetryRequest(statement), null, executionIndex, retryCount + 1, false);
11311134
break;
11321135
case RETHROW:
1133-
trackNodeError(node, error);
1136+
trackNodeError(node, error, null);
11341137
abort(error, true);
11351138
break;
11361139
case IGNORE:
@@ -1443,12 +1446,20 @@ private void reenableAutoReadIfNeeded() {
14431446

14441447
// ERROR HANDLING
14451448

1446-
private void trackNodeError(@NonNull Node node, @NonNull Throwable error) {
1449+
private void trackNodeError(
1450+
@NonNull Node node, @NonNull Throwable error, @Nullable Frame frame) {
14471451
if (nodeErrorReported.compareAndSet(false, true)) {
14481452
long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;
14491453
context
14501454
.getRequestTracker()
1451-
.onNodeError(this.statement, error, latencyNanos, executionProfile, node, logPrefix);
1455+
.onNodeError(
1456+
this.statement,
1457+
error,
1458+
latencyNanos,
1459+
executionProfile,
1460+
node,
1461+
createExecutionInfo(frame),
1462+
logPrefix);
14521463
}
14531464
}
14541465

@@ -1562,21 +1573,32 @@ private void completeResultSetFuture(
15621573
if (resultSetClass.isInstance(pageOrError)) {
15631574
if (future.complete(resultSetClass.cast(pageOrError))) {
15641575
throttler.signalSuccess(ContinuousRequestHandlerBase.this);
1576+
1577+
ExecutionInfo executionInfo = null;
1578+
if (pageOrError instanceof AsyncPagingIterable) {
1579+
executionInfo = ((AsyncPagingIterable) pageOrError).getExecutionInfo();
1580+
} else if (pageOrError instanceof AsyncGraphResultSet) {
1581+
executionInfo = ((AsyncGraphResultSet) pageOrError).getRequestExecutionInfo();
1582+
}
1583+
15651584
if (nodeSuccessReported.compareAndSet(false, true)) {
15661585
context
15671586
.getRequestTracker()
1568-
.onNodeSuccess(statement, nodeLatencyNanos, executionProfile, node, logPrefix);
1587+
.onNodeSuccess(
1588+
statement, nodeLatencyNanos, executionProfile, node, executionInfo, logPrefix);
15691589
}
15701590
context
15711591
.getRequestTracker()
1572-
.onSuccess(statement, totalLatencyNanos, executionProfile, node, logPrefix);
1592+
.onSuccess(
1593+
statement, totalLatencyNanos, executionProfile, node, executionInfo, logPrefix);
15731594
}
15741595
} else {
15751596
Throwable error = (Throwable) pageOrError;
15761597
if (future.completeExceptionally(error)) {
15771598
context
15781599
.getRequestTracker()
1579-
.onError(statement, error, totalLatencyNanos, executionProfile, node, logPrefix);
1600+
.onError(
1601+
statement, error, totalLatencyNanos, executionProfile, node, null, logPrefix);
15801602
if (error instanceof DriverTimeoutException) {
15811603
throttler.signalTimeout(ContinuousRequestHandlerBase.this);
15821604
session
@@ -1607,6 +1629,22 @@ private ExecutionInfo createExecutionInfo(@NonNull Result result, @Nullable Fram
16071629
executionProfile);
16081630
}
16091631

1632+
@NonNull
1633+
private ExecutionInfo createExecutionInfo(@Nullable Frame response) {
1634+
return new DefaultExecutionInfo(
1635+
statement,
1636+
node,
1637+
startedSpeculativeExecutionsCount.get(),
1638+
executionIndex,
1639+
errors,
1640+
null,
1641+
response,
1642+
true,
1643+
session,
1644+
context,
1645+
executionProfile);
1646+
}
1647+
16101648
private void logTimeoutSchedulingError(IllegalStateException timeoutError) {
16111649
// If we're racing with session shutdown, the timer might be stopped already. We don't want
16121650
// to schedule more executions anyway, so swallow the error.

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,19 @@ private void setFinalResult(
360360
totalLatencyNanos = completionTimeNanos - startTimeNanos;
361361
long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos;
362362
requestTracker.onNodeSuccess(
363-
callback.statement, nodeLatencyNanos, executionProfile, callback.node, logPrefix);
363+
callback.statement,
364+
nodeLatencyNanos,
365+
executionProfile,
366+
callback.node,
367+
executionInfo,
368+
logPrefix);
364369
requestTracker.onSuccess(
365-
callback.statement, totalLatencyNanos, executionProfile, callback.node, logPrefix);
370+
callback.statement,
371+
totalLatencyNanos,
372+
executionProfile,
373+
callback.node,
374+
executionInfo,
375+
logPrefix);
366376
}
367377
if (sessionMetricUpdater.isEnabled(
368378
DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) {
@@ -446,27 +456,28 @@ private void setFinalError(
446456
GraphStatement<?> statement, Throwable error, Node node, int execution) {
447457
DriverExecutionProfile executionProfile =
448458
Conversions.resolveExecutionProfile(statement, context);
459+
ExecutionInfo executionInfo =
460+
new DefaultExecutionInfo(
461+
statement,
462+
node,
463+
startedSpeculativeExecutionsCount.get(),
464+
execution,
465+
errors,
466+
null,
467+
null,
468+
true,
469+
session,
470+
context,
471+
executionProfile);
449472
if (error instanceof DriverException) {
450-
((DriverException) error)
451-
.setExecutionInfo(
452-
new DefaultExecutionInfo(
453-
statement,
454-
node,
455-
startedSpeculativeExecutionsCount.get(),
456-
execution,
457-
errors,
458-
null,
459-
null,
460-
true,
461-
session,
462-
context,
463-
executionProfile));
473+
((DriverException) error).setExecutionInfo(executionInfo);
464474
}
465475
if (result.completeExceptionally(error)) {
466476
cancelScheduledTasks();
467477
if (!(requestTracker instanceof NoopRequestTracker)) {
468478
long latencyNanos = System.nanoTime() - startTimeNanos;
469-
requestTracker.onError(statement, error, latencyNanos, executionProfile, node, logPrefix);
479+
requestTracker.onError(
480+
statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix);
470481
}
471482
if (error instanceof DriverTimeoutException) {
472483
throttler.signalTimeout(this);
@@ -859,7 +870,8 @@ private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNan
859870
nodeResponseTimeNanos = System.nanoTime();
860871
}
861872
long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos;
862-
requestTracker.onNodeError(statement, error, latencyNanos, executionProfile, node, logPrefix);
873+
requestTracker.onNodeError(
874+
statement, error, latencyNanos, executionProfile, node, null, logPrefix);
863875
}
864876

865877
@Override

0 commit comments

Comments
 (0)