diff --git a/core/revapi.json b/core/revapi.json index 5aa46a3ccad..a2f4c16e651 100644 --- a/core/revapi.json +++ b/core/revapi.json @@ -7386,6 +7386,11 @@ "old": "method com.datastax.oss.driver.api.core.type.reflect.GenericType> com.datastax.oss.driver.api.core.type.reflect.GenericType::vectorOf(java.lang.Class)", "new": "method com.datastax.oss.driver.api.core.type.reflect.GenericType> com.datastax.oss.driver.api.core.type.reflect.GenericType::vectorOf(java.lang.Class)", "justification": "JAVA-3143: Extend driver vector support to arbitrary subtypes and fix handling of variable length types (OSS C* 5.0)" + }, + { + "code": "java.method.addedToInterface", + "new": "method com.datastax.oss.driver.api.core.tracker.RequestIdGenerator com.datastax.oss.driver.api.core.context.DriverContext::getRequestIdGenerator()", + "justification": "CASSJAVA-97: Let users inject an ID for each request and write to the custom payload" } ] } diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 6ffd51d86ef..06cfe286819 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -994,7 +994,14 @@ public enum DefaultDriverOption implements DriverOption { * *

Value-type: boolean */ - SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"); + SSL_ALLOW_DNS_REVERSE_LOOKUP_SAN("advanced.ssl-engine-factory.allow-dns-reverse-lookup-san"), + + /** + * The class of session-wide component that generates request IDs. + * + *

Value-type: {@link String} + */ + REQUEST_ID_GENERATOR_CLASS("advanced.request-id.generator.class"); private final String path; diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index 93e2b468461..997f0a09272 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -281,6 +281,10 @@ public String toString() { new TypedDriverOption<>( DefaultDriverOption.REQUEST_TRACKER_CLASSES, GenericType.listOf(String.class)); + /** The class of a session-wide component that generates request IDs. */ + public static final TypedDriverOption REQUEST_ID_GENERATOR_CLASS = + new TypedDriverOption<>(DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS, GenericType.STRING); + /** Whether to log successful requests. */ public static final TypedDriverOption REQUEST_LOGGER_SUCCESS_ENABLED = new TypedDriverOption<>( diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java b/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java index 5b32389e362..6f0afd3df8a 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/context/DriverContext.java @@ -33,6 +33,7 @@ import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy; import com.datastax.oss.driver.api.core.ssl.SslEngineFactory; import com.datastax.oss.driver.api.core.time.TimestampGenerator; +import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Map; @@ -139,6 +140,10 @@ default SpeculativeExecutionPolicy getSpeculativeExecutionPolicy(@NonNull String @NonNull RequestTracker getRequestTracker(); + /** @return The driver's request ID generator; never {@code null}. */ + @NonNull + Optional getRequestIdGenerator(); + /** @return The driver's request throttler; never {@code null}. */ @NonNull RequestThrottler getRequestThrottler(); diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java b/core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java index 4e08bd5434c..5e10fb4d915 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/session/ProgrammaticArguments.java @@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.metadata.NodeStateListener; import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener; import com.datastax.oss.driver.api.core.ssl.SslEngineFactory; +import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry; @@ -59,6 +60,7 @@ public static Builder builder() { private final NodeStateListener nodeStateListener; private final SchemaChangeListener schemaChangeListener; private final RequestTracker requestTracker; + private final RequestIdGenerator requestIdGenerator; private final Map localDatacenters; private final Map> nodeFilters; private final Map nodeDistanceEvaluators; @@ -77,6 +79,7 @@ private ProgrammaticArguments( @Nullable NodeStateListener nodeStateListener, @Nullable SchemaChangeListener schemaChangeListener, @Nullable RequestTracker requestTracker, + @Nullable RequestIdGenerator requestIdGenerator, @NonNull Map localDatacenters, @NonNull Map> nodeFilters, @NonNull Map nodeDistanceEvaluators, @@ -94,6 +97,7 @@ private ProgrammaticArguments( this.nodeStateListener = nodeStateListener; this.schemaChangeListener = schemaChangeListener; this.requestTracker = requestTracker; + this.requestIdGenerator = requestIdGenerator; this.localDatacenters = localDatacenters; this.nodeFilters = nodeFilters; this.nodeDistanceEvaluators = nodeDistanceEvaluators; @@ -128,6 +132,11 @@ public RequestTracker getRequestTracker() { return requestTracker; } + @Nullable + public RequestIdGenerator getRequestIdGenerator() { + return requestIdGenerator; + } + @NonNull public Map getLocalDatacenters() { return localDatacenters; @@ -196,6 +205,7 @@ public static class Builder { private NodeStateListener nodeStateListener; private SchemaChangeListener schemaChangeListener; private RequestTracker requestTracker; + private RequestIdGenerator requestIdGenerator; private ImmutableMap.Builder localDatacentersBuilder = ImmutableMap.builder(); private final ImmutableMap.Builder> nodeFiltersBuilder = ImmutableMap.builder(); @@ -294,6 +304,12 @@ public Builder addRequestTracker(@NonNull RequestTracker requestTracker) { return this; } + @NonNull + public Builder withRequestIdGenerator(@Nullable RequestIdGenerator requestIdGenerator) { + this.requestIdGenerator = requestIdGenerator; + return this; + } + @NonNull public Builder withLocalDatacenter( @NonNull String profileName, @NonNull String localDatacenter) { @@ -417,6 +433,7 @@ public ProgrammaticArguments build() { nodeStateListener, schemaChangeListener, requestTracker, + requestIdGenerator, localDatacentersBuilder.build(), nodeFiltersBuilder.build(), nodeDistanceEvaluatorsBuilder.build(), diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java b/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java index cbf896a0873..3d829c1a17f 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/session/SessionBuilder.java @@ -35,6 +35,7 @@ import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener; import com.datastax.oss.driver.api.core.ssl.ProgrammaticSslEngineFactory; import com.datastax.oss.driver.api.core.ssl.SslEngineFactory; +import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.api.core.type.codec.registry.MutableCodecRegistry; @@ -318,6 +319,17 @@ public SelfT addRequestTracker(@NonNull RequestTracker requestTracker) { return self; } + /** + * Registers a request ID generator. The driver will use the generated ID in the logs and + * optionally add to the custom payload so that users can correlate logs about the same request + * from the Cassandra side. + */ + @NonNull + public SelfT withRequestIdGenerator(@NonNull RequestIdGenerator requestIdGenerator) { + this.programmaticArgumentsBuilder.withRequestIdGenerator(requestIdGenerator); + return self; + } + /** * Registers an authentication provider to use with the session. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java new file mode 100644 index 00000000000..58e567f4246 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestIdGenerator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.api.core.tracker; + +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public interface RequestIdGenerator { + /** + * Generates a unique identifier for the session request. This will be the identifier for the + * entire `session.execute()` call. This identifier will be added to logs, and propagated to + * request trackers. + * + * @param statement the statement to be executed + * @return a unique identifier for the session request + */ + String getSessionRequestId(@NonNull Request statement); + + /** + * Generates a unique identifier for the node request. This will be the identifier for the CQL + * request against a particular node. There can be one or more node requests for a single session + * request, due to retries or speculative executions. This identifier will be added to logs, and + * propagated to request trackers. + * + * @param statement the statement to be executed + * @param sessionRequestId the session request identifier + * @param executionCount the number of previous node requests for this session request, due to + * retries or speculative executions + * @return a unique identifier for the node request + */ + String getNodeRequestId( + @NonNull Request statement, @NonNull String sessionRequestId, int executionCount); + + default Statement getDecoratedStatement( + @NonNull Statement statement, @NonNull String nodeRequestId) { + Map customPayload = + NullAllowingImmutableMap.builder() + .putAll(statement.getCustomPayload()) + .put("request-id", ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8))) + .build(); + return statement.setCustomPayload(customPayload); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java index d29ee48d352..065b41e496a 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/tracker/RequestTracker.java @@ -47,21 +47,22 @@ default void onSuccess( @NonNull Node node) {} /** - * Invoked each time a request succeeds. + * Invoked each time a session request succeeds. A session request is a `session.execute()` call * * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, * GenericType) session.execute} call until the result is made available to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. - * @param requestLogPrefix the dedicated log prefix for this request + * @param sessionRequestLogPrefix the dedicated log prefix for this request */ default void onSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix) { - // If client doesn't override onSuccess with requestLogPrefix delegate call to the old method + @NonNull String sessionRequestLogPrefix) { + // If client doesn't override onSuccess with sessionRequestLogPrefix delegate call to the old + // method onSuccess(request, latencyNanos, executionProfile, node); } @@ -78,13 +79,13 @@ default void onError( @Nullable Node node) {} /** - * Invoked each time a request fails. + * Invoked each time a session request fails. A session request is a `session.execute()` call * * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, * GenericType) session.execute} call until the error is propagated to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the error response, or {@code null} if the error occurred - * @param requestLogPrefix the dedicated log prefix for this request + * @param sessionRequestLogPrefix the dedicated log prefix for this request */ default void onError( @NonNull Request request, @@ -92,8 +93,9 @@ default void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, - @NonNull String requestLogPrefix) { - // If client doesn't override onError with requestLogPrefix delegate call to the old method + @NonNull String sessionRequestLogPrefix) { + // If client doesn't override onError with sessionRequestLogPrefix delegate call to the old + // method onError(request, error, latencyNanos, executionProfile, node); } @@ -110,14 +112,15 @@ default void onNodeError( @NonNull Node node) {} /** - * Invoked each time a request fails at the node level. Similar to {@link #onError(Request, - * Throwable, long, DriverExecutionProfile, Node, String)} but at a per node level. + * Invoked each time a node request fails. A node request is a CQL request sent to a particular + * node. There can be one or more node requests for a single session request, due to retries or + * speculative executions. * * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, * GenericType) session.execute} call until the error is propagated to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the error response. - * @param requestLogPrefix the dedicated log prefix for this request + * @param nodeRequestLogPrefix the dedicated log prefix for this request */ default void onNodeError( @NonNull Request request, @@ -125,8 +128,9 @@ default void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix) { - // If client doesn't override onNodeError with requestLogPrefix delegate call to the old method + @NonNull String nodeRequestLogPrefix) { + // If client doesn't override onNodeError with nodeRequestLogPrefix delegate call to the old + // method onNodeError(request, error, latencyNanos, executionProfile, node); } @@ -142,22 +146,23 @@ default void onNodeSuccess( @NonNull Node node) {} /** - * Invoked each time a request succeeds at the node level. Similar to {@link #onSuccess(Request, - * long, DriverExecutionProfile, Node, String)} but at per node level. + * Invoked each time a node request succeeds. A node request is a CQL request sent to a particular + * node. There can be one or more node requests for a single session request, due to retries or + * speculative executions. * * @param latencyNanos the overall execution time (from the {@link Session#execute(Request, * GenericType) session.execute} call until the result is made available to the client). * @param executionProfile the execution profile of this request. * @param node the node that returned the successful response. - * @param requestLogPrefix the dedicated log prefix for this request + * @param nodeRequestLogPrefix the dedicated log prefix for this request */ default void onNodeSuccess( @NonNull Request request, long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestLogPrefix) { - // If client doesn't override onNodeSuccess with requestLogPrefix delegate call to the old + @NonNull String nodeRequestLogPrefix) { + // If client doesn't override onNodeSuccess with nodeRequestLogPrefix delegate call to the old // method onNodeSuccess(request, latencyNanos, executionProfile, node); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java index a24b632f640..56d9e0f4eac 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/DefaultDriverContext.java @@ -44,6 +44,7 @@ import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy; import com.datastax.oss.driver.api.core.ssl.SslEngineFactory; import com.datastax.oss.driver.api.core.time.TimestampGenerator; +import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.api.core.type.codec.TypeCodec; import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; @@ -221,6 +222,7 @@ public class DefaultDriverContext implements InternalDriverContext { private final LazyReference nodeStateListenerRef; private final LazyReference schemaChangeListenerRef; private final LazyReference requestTrackerRef; + private final LazyReference> requestIdGeneratorRef; private final LazyReference> authProviderRef; private final LazyReference> lifecycleListenersRef = new LazyReference<>("lifecycleListeners", this::buildLifecycleListeners, cycleDetector); @@ -282,6 +284,11 @@ public DefaultDriverContext( this.requestTrackerRef = new LazyReference<>( "requestTracker", () -> buildRequestTracker(requestTrackerFromBuilder), cycleDetector); + this.requestIdGeneratorRef = + new LazyReference<>( + "requestIdGenerator", + () -> buildRequestIdGenerator(programmaticArguments.getRequestIdGenerator()), + cycleDetector); this.sslEngineFactoryRef = new LazyReference<>( "sslEngineFactory", @@ -709,6 +716,17 @@ protected RequestTracker buildRequestTracker(RequestTracker requestTrackerFromBu } } + protected Optional buildRequestIdGenerator( + RequestIdGenerator requestIdGenerator) { + return (requestIdGenerator != null) + ? Optional.of(requestIdGenerator) + : Reflection.buildFromConfig( + this, + DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS, + RequestIdGenerator.class, + "com.datastax.oss.driver.internal.core.tracker"); + } + protected Optional buildAuthProvider(AuthProvider authProviderFromBuilder) { return (authProviderFromBuilder != null) ? Optional.of(authProviderFromBuilder) @@ -973,6 +991,12 @@ public RequestTracker getRequestTracker() { return requestTrackerRef.get(); } + @NonNull + @Override + public Optional getRequestIdGenerator() { + return requestIdGeneratorRef.get(); + } + @Nullable @Override public String getLocalDatacenter(@NonNull String profileName) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java index 0808bdce63f..5cb0ebacd18 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java @@ -44,6 +44,7 @@ import com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException; import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler; import com.datastax.oss.driver.api.core.session.throttling.Throttled; +import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator; import com.datastax.oss.driver.api.core.tracker.RequestTracker; import com.datastax.oss.driver.internal.core.adminrequest.ThrottledAdminRequestHandler; import com.datastax.oss.driver.internal.core.adminrequest.UnexpectedResponseException; @@ -82,6 +83,7 @@ import java.util.AbstractMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -125,6 +127,7 @@ public class CqlRequestHandler implements Throttled { private final List inFlightCallbacks; private final RequestThrottler throttler; private final RequestTracker requestTracker; + private final Optional requestIdGenerator; private final SessionMetricUpdater sessionMetricUpdater; private final DriverExecutionProfile executionProfile; @@ -139,7 +142,11 @@ protected CqlRequestHandler( String sessionLogPrefix) { this.startTimeNanos = System.nanoTime(); - this.logPrefix = sessionLogPrefix + "|" + this.hashCode(); + this.requestIdGenerator = context.getRequestIdGenerator(); + this.logPrefix = + this.requestIdGenerator.isPresent() + ? this.requestIdGenerator.get().getSessionRequestId(statement) + : sessionLogPrefix + "|" + this.hashCode(); LOG.trace("[{}] Creating new handler for request {}", logPrefix, statement); this.initialStatement = statement; @@ -267,6 +274,16 @@ private void sendRequest( setFinalError(statement, AllNodesFailedException.fromErrors(this.errors), null, -1); } } else { + String nodeLogPrefix; + if (this.requestIdGenerator.isPresent()) { + nodeLogPrefix = + this.requestIdGenerator + .get() + .getNodeRequestId(statement, logPrefix, currentExecutionIndex); + statement = this.requestIdGenerator.get().getDecoratedStatement(statement, nodeLogPrefix); + } else { + nodeLogPrefix = logPrefix + "|" + currentExecutionIndex; + } NodeResponseCallback nodeResponseCallback = new NodeResponseCallback( statement, @@ -276,7 +293,7 @@ private void sendRequest( currentExecutionIndex, retryCount, scheduleNextExecution, - logPrefix); + nodeLogPrefix); Message message = Conversions.toMessage(statement, executionProfile, context); channel .write(message, statement.isTracing(), statement.getCustomPayload(), nodeResponseCallback) @@ -489,7 +506,7 @@ private NodeResponseCallback( this.execution = execution; this.retryCount = retryCount; this.scheduleNextExecution = scheduleNextExecution; - this.logPrefix = logPrefix + "|" + execution; + this.logPrefix = logPrefix; } // this gets invoked once the write completes. diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java index 0f03cbb3643..ea2342314e5 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java @@ -245,7 +245,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String nodeRequestLogPrefix) { updateResponseTimes(node); } @@ -256,7 +256,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String nodeRequestLogPrefix) { updateResponseTimes(node); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java index d4d20f3eb78..6fe2ba059bd 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/MultiplexingRequestTracker.java @@ -82,10 +82,12 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String sessionRequestLogPrefix) { invokeTrackers( - tracker -> tracker.onSuccess(request, latencyNanos, executionProfile, node, logPrefix), - logPrefix, + tracker -> + tracker.onSuccess( + request, latencyNanos, executionProfile, node, sessionRequestLogPrefix), + sessionRequestLogPrefix, "onSuccess"); } @@ -96,10 +98,12 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @Nullable Node node, - @NonNull String logPrefix) { + @NonNull String sessionRequestLogPrefix) { invokeTrackers( - tracker -> tracker.onError(request, error, latencyNanos, executionProfile, node, logPrefix), - logPrefix, + tracker -> + tracker.onError( + request, error, latencyNanos, executionProfile, node, sessionRequestLogPrefix), + sessionRequestLogPrefix, "onError"); } @@ -109,10 +113,12 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String nodeRequestLogPrefix) { invokeTrackers( - tracker -> tracker.onNodeSuccess(request, latencyNanos, executionProfile, node, logPrefix), - logPrefix, + tracker -> + tracker.onNodeSuccess( + request, latencyNanos, executionProfile, node, nodeRequestLogPrefix), + nodeRequestLogPrefix, "onNodeSuccess"); } @@ -123,11 +129,12 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String nodeRequestLogPrefix) { invokeTrackers( tracker -> - tracker.onNodeError(request, error, latencyNanos, executionProfile, node, logPrefix), - logPrefix, + tracker.onNodeError( + request, error, latencyNanos, executionProfile, node, nodeRequestLogPrefix), + nodeRequestLogPrefix, "onNodeError"); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java index 09ac27e5e75..3821c6ace2d 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/NoopRequestTracker.java @@ -42,7 +42,7 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix) { + @NonNull String sessionRequestLogPrefix) { // nothing to do } @@ -53,7 +53,7 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, - @NonNull String requestPrefix) { + @NonNull String sessionRequestLogPrefix) { // nothing to do } @@ -64,7 +64,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix) { + @NonNull String nodeRequestLogPrefix) { // nothing to do } @@ -74,7 +74,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String requestPrefix) { + @NonNull String nodeRequestLogPrefix) { // nothing to do } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java index 235ef051b40..f242ff89c54 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/RequestLogger.java @@ -86,7 +86,7 @@ public void onSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String sessionRequestLogPrefix) { boolean successEnabled = executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED, false); @@ -129,7 +129,7 @@ public void onSuccess( showValues, maxValues, maxValueLength, - logPrefix); + sessionRequestLogPrefix); } @Override @@ -139,7 +139,7 @@ public void onError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, Node node, - @NonNull String logPrefix) { + @NonNull String sessionRequestLogPrefix) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED, false)) { return; @@ -173,7 +173,7 @@ public void onError( maxValues, maxValueLength, showStackTraces, - logPrefix); + sessionRequestLogPrefix); } @Override @@ -183,7 +183,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String nodeRequestLogPrefix) { // Nothing to do } @@ -193,7 +193,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String nodeRequestLogPrefix) { // Nothing to do } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java new file mode 100644 index 00000000000..cafe278cf85 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/UuidRequestIdGenerator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.tracker; + +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator; +import com.datastax.oss.driver.api.core.uuid.Uuids; +import edu.umd.cs.findbugs.annotations.NonNull; + +public class UuidRequestIdGenerator implements RequestIdGenerator { + public UuidRequestIdGenerator(DriverContext context) {} + + /** Generates a random v4 UUID. */ + @Override + public String getSessionRequestId(@NonNull Request statement) { + return Uuids.random().toString(); + } + + /** + * {session-request-id}-{random-uuid} All node requests for a session request will have the same + * session request id + */ + @Override + public String getNodeRequestId( + @NonNull Request statement, @NonNull String sessionRequestId, int executionCount) { + return sessionRequestId + "-" + Uuids.random(); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java new file mode 100644 index 00000000000..71a895e9c4e --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/tracker/W3CContextRequestIdGenerator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.tracker; + +import com.datastax.oss.driver.api.core.context.DriverContext; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator; +import com.datastax.oss.driver.shaded.guava.common.io.BaseEncoding; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.security.SecureRandom; +import java.util.Random; + +public class W3CContextRequestIdGenerator implements RequestIdGenerator { + Random random = new SecureRandom(); + BaseEncoding baseEncoding = BaseEncoding.base16().lowerCase(); + + public W3CContextRequestIdGenerator(DriverContext context) {} + + /** Random 16 bytes, e.g. "4bf92f3577b34da6a3ce929d0e0e4736" */ + @Override + public String getSessionRequestId(@NonNull Request statement) { + byte[] bytes = new byte[16]; + random.nextBytes(bytes); + return baseEncoding.encode(bytes); + } + + /** + * Following the format of W3C "traceparent" spec, + * https://www.w3.org/TR/trace-context/#traceparent-header-field-values e.g. + * "00-4bf92f3577b34da6a3ce929d0e0e4736-a3ce929d0e0e4736-01" All node requests in the same session + * request share the same "trace-id" field value + */ + @Override + public String getNodeRequestId( + @NonNull Request statement, @NonNull String sessionRequestId, int executionCount) { + byte[] bytes = new byte[8]; + random.nextBytes(bytes); + return String.format("00-%s-%s-00", sessionRequestId, baseEncoding.encode(bytes)); + } +} diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index f09ffd18a10..24dff6a052a 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -918,6 +918,13 @@ datastax-java-driver { } } + advanced.request-id { + generator { + # The component that generates a unique identifier for each CQL request, and possibly write the id to the custom payload . + // class = W3CContextRequestIdGenerator + } + } + # A session-wide component that controls the rate at which requests are executed. # # Implementations vary, but throttlers generally track a metric that represents the level of diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java new file mode 100644 index 00000000000..448aaaa45d2 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/tracker/RequestIdGeneratorTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.tracker; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.Strict.class) +public class RequestIdGeneratorTest { + @Mock private InternalDriverContext context; + @Mock private Statement statement; + + @Test + public void uuid_generator_should_generate() { + // given + UuidRequestIdGenerator generator = new UuidRequestIdGenerator(context); + // when + String sessionRequestId = generator.getSessionRequestId(statement); + String nodeRequestId = generator.getNodeRequestId(statement, sessionRequestId, 1); + // then + // e.g. "550e8400-e29b-41d4-a716-446655440000", which is 36 characters long + assertThat(sessionRequestId.length()).isEqualTo(36); + // e.g. "550e8400-e29b-41d4-a716-446655440000-550e8400-e29b-41d4-a716-446655440000", which is 73 + // characters long + assertThat(nodeRequestId.length()).isEqualTo(73); + } + + @Test + public void w3c_generator_should_generate() { + // given + W3CContextRequestIdGenerator generator = new W3CContextRequestIdGenerator(context); + // when + String sessionRequestId = generator.getSessionRequestId(statement); + String nodeRequestId = generator.getNodeRequestId(statement, sessionRequestId, 1); + // then + // e.g. "4bf92f3577b34da6a3ce929d0e0e4736", which is 32 characters long + assertThat(sessionRequestId.length()).isEqualTo(32); + // According to W3C "traceparent" spec, + // https://www.w3.org/TR/trace-context/#traceparent-header-field-values + // e.g. "00-4bf92f3577b34da6a3ce929d0e0e4736-a3ce929d0e0e4736-01", which 55 characters long + assertThat(nodeRequestId.length()).isEqualTo(55); + } +} diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java new file mode 100644 index 00000000000..d5bb80c5b46 --- /dev/null +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestIdGeneratorIT.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.core.tracker; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Statement; +import com.datastax.oss.driver.api.core.session.Request; +import com.datastax.oss.driver.api.core.tracker.RequestIdGenerator; +import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; +import com.datastax.oss.driver.api.testinfra.session.SessionUtils; +import com.datastax.oss.driver.categories.ParallelizableTests; +import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +@Category(ParallelizableTests.class) +public class RequestIdGeneratorIT { + private CcmRule ccmRule = CcmRule.getInstance(); + + @Rule public TestRule chain = RuleChain.outerRule(ccmRule); + + @Test + public void should_write_uuid_to_custom_payload_with_key() { + DriverConfigLoader loader = + SessionUtils.configLoaderBuilder() + .withString(DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS, "UuidRequestIdGenerator") + .build(); + try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) { + String query = "SELECT * FROM system.local"; + ResultSet rs = session.execute(query); + ByteBuffer id = rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id"); + assertThat(id.remaining()).isEqualTo(73); + } + } + + @Test + public void should_write_w3c_context_to_custom_payload_with_key() { + DriverConfigLoader loader = + SessionUtils.configLoaderBuilder() + .withString( + DefaultDriverOption.REQUEST_ID_GENERATOR_CLASS, "W3CContextRequestIdGenerator") + .build(); + try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) { + String query = "SELECT * FROM system.local"; + ResultSet rs = session.execute(query); + ByteBuffer id = rs.getExecutionInfo().getRequest().getCustomPayload().get("request-id"); + assertThat(id.remaining()).isEqualTo(55); + } + } + + @Test + public void should_use_customized_request_id_generator() { + RequestIdGenerator myRequestIdGenerator = + new RequestIdGenerator() { + @Override + public String getSessionRequestId(@NonNull Request statement) { + return "123"; + } + + @Override + public String getNodeRequestId( + @NonNull Request statement, @NonNull String sessionRequestId, int executionCount) { + return "456"; + } + + @Override + public Statement getDecoratedStatement( + @NonNull Statement statement, @NonNull String nodeRequestId) { + Map customPayload = + NullAllowingImmutableMap.builder() + .putAll(statement.getCustomPayload()) + .put( + "trace_key", + ByteBuffer.wrap(nodeRequestId.getBytes(StandardCharsets.UTF_8))) + .build(); + return statement.setCustomPayload(customPayload); + } + }; + try (CqlSession session = + (CqlSession) + SessionUtils.baseBuilder() + .addContactEndPoints(ccmRule.getContactPoints()) + .withRequestIdGenerator(myRequestIdGenerator) + .build()) { + String query = "SELECT * FROM system.local"; + ResultSet rs = session.execute(query); + ByteBuffer id = rs.getExecutionInfo().getRequest().getCustomPayload().get("trace_key"); + assertThat(id).isEqualTo(ByteBuffer.wrap("456".getBytes(StandardCharsets.UTF_8))); + } + } + + @Test + public void should_not_write_id_to_custom_payload_when_key_is_not_set() { + DriverConfigLoader loader = SessionUtils.configLoaderBuilder().build(); + try (CqlSession session = SessionUtils.newSession(ccmRule, loader)) { + String query = "SELECT * FROM system.local"; + ResultSet rs = session.execute(query); + assertThat(rs.getExecutionInfo().getRequest().getCustomPayload().get("trace_key")).isNull(); + } + } +} diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java index eae98339637..8eb2fb80a73 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/tracker/RequestNodeLoggerExample.java @@ -39,7 +39,7 @@ public void onNodeError( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String nodeRequestLogPrefix) { if (!executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_ERROR_ENABLED)) { return; } @@ -66,7 +66,7 @@ public void onNodeError( maxValues, maxValueLength, showStackTraces, - logPrefix); + nodeRequestLogPrefix); } @Override @@ -75,7 +75,7 @@ public void onNodeSuccess( long latencyNanos, @NonNull DriverExecutionProfile executionProfile, @NonNull Node node, - @NonNull String logPrefix) { + @NonNull String nodeRequestLogPrefix) { boolean successEnabled = executionProfile.getBoolean(DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED); boolean slowEnabled = @@ -114,6 +114,6 @@ public void onNodeSuccess( showValues, maxValues, maxValueLength, - logPrefix); + nodeRequestLogPrefix); } } diff --git a/manual/core/request_id/README.md b/manual/core/request_id/README.md new file mode 100644 index 00000000000..a766a4419af --- /dev/null +++ b/manual/core/request_id/README.md @@ -0,0 +1,48 @@ + + +## Request Id + +### Quick overview + +Users can inject an identifier for each individual CQL request, and such ID can be written in to the [custom payload](https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v5.spec) to +correlate a request across the driver and the Apache Cassandra server. + +A request ID generator needs to generate both: +- Session request ID: an identifier for an entire session.execute() call +- Node request ID: an identifier for the execution of a CQL statement against a particular node. There can be one or more node requests for a single session request, due to retries or speculative executions. + +Usage: +* Inject ID generator: set the desired `RequestIdGenerator` in `advanced.request-id.generator.class`. +* Add ID to custom payload: the default behavior of a `RequestIdGenerator` is to add the request ID into the custom payload with the key `request-id`. Override `RequestIdGenerator.getDecoratedStatement` to customize the behavior. + +### Request Id Generator Configuration + +Request ID generator can be declared in the [configuration](../configuration/) as follows: + +``` +datastax-java-driver.advanced.request-id.generator { + class = com.example.app.MyGenerator +} +``` + +To register your own request ID generator, specify the name of the class +that implements `RequestIdGenerator`. + +The generated ID will be added to the log message of `CqlRequestHandler`, and propagated to other classes, e.g. the request trackers. \ No newline at end of file