Skip to content

Commit 9c38b58

Browse files
Remove duplicated request tracker callback arguments
1 parent c75a59b commit 9c38b58

File tree

21 files changed

+362
-525
lines changed

21 files changed

+362
-525
lines changed

core/src/main/java/com/datastax/dse/driver/api/core/graph/GraphExecutionInfo.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,12 @@ default DriverExecutionProfile getExecutionProfile() {
6969
*/
7070
int getSuccessfulExecutionIndex();
7171

72+
/** @return Exception raised by the driver to the application. */
73+
@Nullable
74+
default Throwable getDriverError() {
75+
return null;
76+
}
77+
7278
/**
7379
* The errors encountered on previous coordinators, if any.
7480
*

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
628628
Throwable error = future.cause();
629629
if (error instanceof EncoderException
630630
&& error.getCause() instanceof FrameTooLongException) {
631-
trackNodeError(node, error.getCause(), null);
631+
trackNodeError(error.getCause(), null);
632632
lock.lock();
633633
try {
634634
abort(error.getCause(), false);
@@ -645,7 +645,7 @@ public void operationComplete(@NonNull Future<java.lang.Void> future) {
645645
.getMetricUpdater()
646646
.incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName());
647647
recordError(node, error);
648-
trackNodeError(node, error.getCause(), null);
648+
trackNodeError(error.getCause(), null);
649649
sendRequest(statement, null, executionIndex, retryCount, scheduleSpeculativeExecution);
650650
}
651651
} else {
@@ -766,11 +766,11 @@ public void onResponse(@NonNull Frame response) {
766766
} else {
767767
IllegalStateException error =
768768
new IllegalStateException("Unexpected response " + responseMessage);
769-
trackNodeError(node, error, response);
769+
trackNodeError(error, response);
770770
abort(error, false);
771771
}
772772
} catch (Throwable t) {
773-
trackNodeError(node, t, response);
773+
trackNodeError(t, response);
774774
abort(t, false);
775775
}
776776
} finally {
@@ -914,7 +914,7 @@ private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame fr
914914
if (error instanceof BootstrappingException) {
915915
LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node);
916916
recordError(node, error);
917-
trackNodeError(node, error, frame);
917+
trackNodeError(error, frame);
918918
sendRequest(statement, null, executionIndex, retryCount, false);
919919
} else if (error instanceof QueryValidationException
920920
|| error instanceof FunctionFailureException
@@ -926,7 +926,7 @@ private void processErrorResponse(@NonNull Error errorMessage, @NonNull Frame fr
926926
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
927927
metricUpdater.incrementCounter(
928928
DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName());
929-
trackNodeError(node, error, frame);
929+
trackNodeError(error, frame);
930930
abort(error, true);
931931
} else {
932932
try {
@@ -1065,7 +1065,7 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10651065
+ "This usually happens when you run a 'USE...' query after "
10661066
+ "the statement was prepared.",
10671067
Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId)));
1068-
trackNodeError(node, illegalStateException, null);
1068+
trackNodeError(illegalStateException, null);
10691069
fatalError = illegalStateException;
10701070
} else {
10711071
LOG.trace(
@@ -1084,18 +1084,18 @@ private void processUnprepared(@NonNull Unprepared errorMessage) {
10841084
|| prepareError instanceof FunctionFailureException
10851085
|| prepareError instanceof ProtocolError) {
10861086
LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix);
1087-
trackNodeError(node, prepareError, null);
1087+
trackNodeError(prepareError, null);
10881088
fatalError = prepareError;
10891089
}
10901090
}
10911091
} else if (exception instanceof RequestThrottlingException) {
1092-
trackNodeError(node, exception, null);
1092+
trackNodeError(exception, null);
10931093
fatalError = exception;
10941094
}
10951095
if (fatalError == null) {
10961096
LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix);
10971097
recordError(node, exception);
1098-
trackNodeError(node, exception, null);
1098+
trackNodeError(exception, null);
10991099
sendRequest(statement, null, executionIndex, retryCount, false);
11001100
}
11011101
}
@@ -1123,18 +1123,18 @@ private void processRetryVerdict(@NonNull RetryVerdict verdict, @NonNull Throwab
11231123
switch (verdict.getRetryDecision()) {
11241124
case RETRY_SAME:
11251125
recordError(node, error);
1126-
trackNodeError(node, error, null);
1126+
trackNodeError(error, null);
11271127
sendRequest(
11281128
verdict.getRetryRequest(statement), node, executionIndex, retryCount + 1, false);
11291129
break;
11301130
case RETRY_NEXT:
11311131
recordError(node, error);
1132-
trackNodeError(node, error, null);
1132+
trackNodeError(error, null);
11331133
sendRequest(
11341134
verdict.getRetryRequest(statement), null, executionIndex, retryCount + 1, false);
11351135
break;
11361136
case RETHROW:
1137-
trackNodeError(node, error, null);
1137+
trackNodeError(error, null);
11381138
abort(error, true);
11391139
break;
11401140
case IGNORE:
@@ -1447,18 +1447,13 @@ private void reenableAutoReadIfNeeded() {
14471447

14481448
// ERROR HANDLING
14491449

1450-
private void trackNodeError(
1451-
@NonNull Node node, @NonNull Throwable error, @Nullable Frame frame) {
1450+
private void trackNodeError(@NonNull Throwable error, @Nullable Frame frame) {
14521451
if (nodeErrorReported.compareAndSet(false, true)) {
14531452
long latencyNanos = System.nanoTime() - this.messageStartTimeNanos;
14541453
context
14551454
.getRequestTracker()
14561455
.onNodeError(
1457-
this.statement,
1458-
error,
14591456
latencyNanos,
1460-
executionProfile,
1461-
node,
14621457
createExecutionInfo(error).withServerResponse(frame).build(),
14631458
logPrefix);
14641459
}
@@ -1583,23 +1578,16 @@ private void completeResultSetFuture(
15831578
}
15841579

15851580
if (nodeSuccessReported.compareAndSet(false, true)) {
1586-
context
1587-
.getRequestTracker()
1588-
.onNodeSuccess(
1589-
statement, nodeLatencyNanos, executionProfile, node, executionInfo, logPrefix);
1581+
context.getRequestTracker().onNodeSuccess(nodeLatencyNanos, executionInfo, logPrefix);
15901582
}
1591-
context
1592-
.getRequestTracker()
1593-
.onSuccess(
1594-
statement, totalLatencyNanos, executionProfile, node, executionInfo, logPrefix);
1583+
context.getRequestTracker().onSuccess(totalLatencyNanos, executionInfo, logPrefix);
15951584
}
15961585
} else {
15971586
Throwable error = (Throwable) pageOrError;
15981587
if (future.completeExceptionally(error)) {
15991588
context
16001589
.getRequestTracker()
1601-
.onError(
1602-
statement, error, totalLatencyNanos, executionProfile, node, null, logPrefix);
1590+
.onError(totalLatencyNanos, createExecutionInfo(error).build(), logPrefix);
16031591
if (error instanceof DriverTimeoutException) {
16041592
throttler.signalTimeout(ContinuousRequestHandlerBase.this);
16051593
session

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphExecutionInfoConverter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ public int getSuccessfulExecutionIndex() {
8484
return graphExecutionInfo.getSuccessfulExecutionIndex();
8585
}
8686

87+
@Nullable
88+
@Override
89+
public Throwable getDriverError() {
90+
return graphExecutionInfo.getDriverError();
91+
}
92+
8793
@NonNull
8894
@Override
8995
public List<Entry<Node, Throwable>> getErrors() {
@@ -172,6 +178,11 @@ public int getSuccessfulExecutionIndex() {
172178
return executionInfo.getSuccessfulExecutionIndex();
173179
}
174180

181+
@Override
182+
public Throwable getDriverError() {
183+
return executionInfo.getDriverError();
184+
}
185+
175186
@Override
176187
public List<Entry<Node, Throwable>> getErrors() {
177188
return executionInfo.getErrors();

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -371,20 +371,8 @@ private void setFinalResult(
371371
completionTimeNanos = System.nanoTime();
372372
totalLatencyNanos = completionTimeNanos - startTimeNanos;
373373
long nodeLatencyNanos = completionTimeNanos - callback.nodeStartTimeNanos;
374-
requestTracker.onNodeSuccess(
375-
callback.statement,
376-
nodeLatencyNanos,
377-
executionProfile,
378-
callback.node,
379-
executionInfo,
380-
logPrefix);
381-
requestTracker.onSuccess(
382-
callback.statement,
383-
totalLatencyNanos,
384-
executionProfile,
385-
callback.node,
386-
executionInfo,
387-
logPrefix);
374+
requestTracker.onNodeSuccess(nodeLatencyNanos, executionInfo, logPrefix);
375+
requestTracker.onSuccess(totalLatencyNanos, executionInfo, logPrefix);
388376
}
389377
if (sessionMetricUpdater.isEnabled(
390378
DseSessionMetric.GRAPH_REQUESTS, executionProfile.getName())) {
@@ -470,8 +458,7 @@ private void setFinalError(
470458
cancelScheduledTasks();
471459
if (!(requestTracker instanceof NoopRequestTracker)) {
472460
long latencyNanos = System.nanoTime() - startTimeNanos;
473-
requestTracker.onError(
474-
statement, error, latencyNanos, executionProfile, node, executionInfo, logPrefix);
461+
requestTracker.onError(latencyNanos, executionInfo, logPrefix);
475462
}
476463
if (error instanceof DriverTimeoutException) {
477464
throttler.signalTimeout(this);
@@ -533,7 +520,7 @@ public void operationComplete(Future<java.lang.Void> future) {
533520
Throwable error = future.cause();
534521
if (error instanceof EncoderException
535522
&& error.getCause() instanceof FrameTooLongException) {
536-
trackNodeError(node, error.getCause(), NANOTIME_NOT_MEASURED_YET);
523+
trackNodeError(this, error.getCause(), NANOTIME_NOT_MEASURED_YET, null);
537524
setFinalError(statement, error.getCause(), node, execution);
538525
} else {
539526
LOG.trace(
@@ -542,7 +529,7 @@ public void operationComplete(Future<java.lang.Void> future) {
542529
channel,
543530
error);
544531
recordError(node, error);
545-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
532+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, null);
546533
((DefaultNode) node)
547534
.getMetricUpdater()
548535
.incrementCounter(DefaultNodeMetric.UNSENT_REQUESTS, executionProfile.getName());
@@ -650,38 +637,39 @@ public void onResponse(Frame responseFrame) {
650637
setFinalResult((Result) responseMessage, responseFrame, this);
651638
} else if (responseMessage instanceof Error) {
652639
LOG.trace("[{}] Got error response, processing", logPrefix);
653-
processErrorResponse((Error) responseMessage);
640+
processErrorResponse((Error) responseMessage, responseFrame);
654641
} else {
655642
trackNodeError(
656-
node,
643+
this,
657644
new IllegalStateException("Unexpected response " + responseMessage),
658-
nodeResponseTimeNanos);
645+
nodeResponseTimeNanos,
646+
responseFrame);
659647
setFinalError(
660648
statement,
661649
new IllegalStateException("Unexpected response " + responseMessage),
662650
node,
663651
execution);
664652
}
665653
} catch (Throwable t) {
666-
trackNodeError(node, t, nodeResponseTimeNanos);
654+
trackNodeError(this, t, nodeResponseTimeNanos, responseFrame);
667655
setFinalError(statement, t, node, execution);
668656
}
669657
}
670658

671-
private void processErrorResponse(Error errorMessage) {
659+
private void processErrorResponse(Error errorMessage, Frame responseFrame) {
672660
CoordinatorException error = Conversions.toThrowable(node, errorMessage, context);
673661
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
674662
if (error instanceof BootstrappingException) {
675663
LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node);
676664
recordError(node, error);
677-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
665+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, responseFrame);
678666
sendRequest(statement, null, queryPlan, execution, retryCount, false);
679667
} else if (error instanceof QueryValidationException
680668
|| error instanceof FunctionFailureException
681669
|| error instanceof ProtocolError) {
682670
LOG.trace("[{}] Unrecoverable error, rethrowing", logPrefix);
683671
metricUpdater.incrementCounter(DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName());
684-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
672+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, responseFrame);
685673
setFinalError(statement, error, node, execution);
686674
} else {
687675
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
@@ -756,7 +744,7 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) {
756744
switch (verdict.getRetryDecision()) {
757745
case RETRY_SAME:
758746
recordError(node, error);
759-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
747+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, null);
760748
sendRequest(
761749
verdict.getRetryRequest(statement),
762750
node,
@@ -767,7 +755,7 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) {
767755
break;
768756
case RETRY_NEXT:
769757
recordError(node, error);
770-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
758+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, null);
771759
sendRequest(
772760
verdict.getRetryRequest(statement),
773761
null,
@@ -777,7 +765,7 @@ private void processRetryVerdict(RetryVerdict verdict, Throwable error) {
777765
false);
778766
break;
779767
case RETHROW:
780-
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
768+
trackNodeError(this, error, NANOTIME_NOT_MEASURED_YET, null);
781769
setFinalError(statement, error, node, execution);
782770
break;
783771
case IGNORE:
@@ -856,16 +844,29 @@ void cancel() {
856844
* measured. If {@link #NANOTIME_NOT_MEASURED_YET}, it hasn't and we need to measure it now
857845
* (this is to avoid unnecessary calls to System.nanoTime)
858846
*/
859-
private void trackNodeError(Node node, Throwable error, long nodeResponseTimeNanos) {
847+
private void trackNodeError(
848+
NodeResponseCallback callback, Throwable error, long nodeResponseTimeNanos, Frame frame) {
860849
if (requestTracker instanceof NoopRequestTracker) {
861850
return;
862851
}
863852
if (nodeResponseTimeNanos == NANOTIME_NOT_MEASURED_YET) {
864853
nodeResponseTimeNanos = System.nanoTime();
865854
}
855+
ExecutionInfo executionInfo =
856+
DefaultExecutionInfo.builder(
857+
callback.statement,
858+
callback.node,
859+
startedSpeculativeExecutionsCount.get(),
860+
callback.execution,
861+
error,
862+
errors,
863+
session,
864+
context,
865+
callback.executionProfile)
866+
.withServerResponse(null, frame)
867+
.build();
866868
long latencyNanos = nodeResponseTimeNanos - this.nodeStartTimeNanos;
867-
requestTracker.onNodeError(
868-
statement, error, latencyNanos, executionProfile, node, null, logPrefix);
869+
requestTracker.onNodeError(latencyNanos, executionInfo, logPrefix);
869870
}
870871

871872
@Override

core/src/main/java/com/datastax/oss/driver/api/core/cql/ExecutionInfo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ default DriverExecutionProfile getExecutionProfile() {
120120
@NonNull
121121
List<Map.Entry<Node, Throwable>> getErrors();
122122

123+
/** @return Exception raised by the driver to the application. */
123124
@Nullable
124125
default Throwable getDriverError() {
125126
return null;

0 commit comments

Comments
 (0)