Skip to content

Commit 77805f5

Browse files
lukasz-antoniaktolbertam
authored andcommitted
JAVA-3149: Support request cancellation in request throttler
patch by Lukasz Antoniak; reviewed by Andy Tolbert and Chris Lohfink for JAVA-3149
1 parent c961012 commit 77805f5

File tree

11 files changed

+89
-8
lines changed

11 files changed

+89
-8
lines changed

core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,7 @@ public void cancel() {
410410

411411
cancelScheduledTasks(null);
412412
cancelGlobalTimeout();
413+
throttler.signalCancel(this);
413414
}
414415

415416
private void cancelGlobalTimeout() {

core/src/main/java/com/datastax/dse/driver/internal/core/graph/GraphRequestHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ public class GraphRequestHandler implements Throttled {
153153
try {
154154
if (t instanceof CancellationException) {
155155
cancelScheduledTasks();
156+
context.getRequestThrottler().signalCancel(this);
156157
}
157158
} catch (Throwable t2) {
158159
Loggers.warnWithException(LOG, "[{}] Uncaught exception", logPrefix, t2);

core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,12 @@ public interface RequestThrottler extends Closeable {
5656
* perform time-based eviction on pending requests.
5757
*/
5858
void signalTimeout(@NonNull Throttled request);
59+
60+
/**
61+
* Signals that a request has been cancelled. This indicates to the throttler that another request
62+
* might be started.
63+
*/
64+
default void signalCancel(@NonNull Throttled request) {
65+
// no-op for backward compatibility purposes
66+
}
5967
}

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlPrepareHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ protected CqlPrepareHandler(
124124
try {
125125
if (t instanceof CancellationException) {
126126
cancelTimeout();
127+
context.getRequestThrottler().signalCancel(this);
127128
}
128129
} catch (Throwable t2) {
129130
Loggers.warnWithException(LOG, "[{}] Uncaught exception", logPrefix, t2);

core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ protected CqlRequestHandler(
152152
try {
153153
if (t instanceof CancellationException) {
154154
cancelScheduledTasks();
155+
context.getRequestThrottler().signalCancel(this);
155156
}
156157
} catch (Throwable t2) {
157158
Loggers.warnWithException(LOG, "[{}] Uncaught exception", logPrefix, t2);

core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,22 @@ public void signalTimeout(@NonNull Throttled request) {
145145
}
146146
}
147147

148+
@Override
149+
public void signalCancel(@NonNull Throttled request) {
150+
lock.lock();
151+
try {
152+
if (!closed) {
153+
if (queue.remove(request)) { // The request has been cancelled before it was active
154+
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
155+
} else {
156+
onRequestDone();
157+
}
158+
}
159+
} finally {
160+
lock.unlock();
161+
}
162+
}
163+
148164
@SuppressWarnings("GuardedBy") // this method is only called with the lock held
149165
private void onRequestDone() {
150166
assert lock.isHeldByCurrentThread();

core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/PassThroughRequestThrottler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public void signalTimeout(@NonNull Throttled request) {
6969
// nothing to do
7070
}
7171

72+
@Override
73+
public void signalCancel(@NonNull Throttled request) {
74+
// nothing to do
75+
}
76+
7277
@Override
7378
public void close() throws IOException {
7479
// nothing to do

core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,18 @@ public void signalTimeout(@NonNull Throttled request) {
198198
}
199199
}
200200

201+
@Override
202+
public void signalCancel(@NonNull Throttled request) {
203+
lock.lock();
204+
try {
205+
if (!closed && queue.remove(request)) { // The request has been cancelled before it was active
206+
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
207+
}
208+
} finally {
209+
lock.unlock();
210+
}
211+
}
212+
201213
@Override
202214
public void close() {
203215
lock.lock();

core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottlerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ public void should_allow_new_request_when_active_one_times_out() {
8888
should_allow_new_request_when_active_one_completes(throttler::signalTimeout);
8989
}
9090

91+
@Test
92+
public void should_allow_new_request_when_active_one_canceled() {
93+
should_allow_new_request_when_active_one_completes(throttler::signalCancel);
94+
}
95+
9196
private void should_allow_new_request_when_active_one_completes(
9297
Consumer<Throttled> completeCallback) {
9398
// Given

core/src/test/java/com/datastax/oss/driver/internal/core/session/throttling/RateLimitingRequestThrottlerTest.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2626
import com.datastax.oss.driver.api.core.config.DriverConfig;
2727
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
28+
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
2829
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
2930
import com.datastax.oss.driver.internal.core.context.NettyOptions;
3031
import com.datastax.oss.driver.internal.core.util.concurrent.ScheduledTaskCapturingEventLoop;
@@ -33,6 +34,7 @@
3334
import java.time.Duration;
3435
import java.util.List;
3536
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Consumer;
3638
import org.junit.Before;
3739
import org.junit.Test;
3840
import org.junit.runner.RunWith;
@@ -164,6 +166,15 @@ public void should_reject_when_queue_is_full() {
164166

165167
@Test
166168
public void should_remove_timed_out_request_from_queue() {
169+
testRemoveInvalidEventFromQueue(throttler::signalTimeout);
170+
}
171+
172+
@Test
173+
public void should_remove_cancel_request_from_queue() {
174+
testRemoveInvalidEventFromQueue(throttler::signalCancel);
175+
}
176+
177+
private void testRemoveInvalidEventFromQueue(Consumer<Throttled> completeCallback) {
167178
// Given
168179
for (int i = 0; i < 5; i++) {
169180
throttler.register(new MockThrottled());
@@ -174,7 +185,7 @@ public void should_remove_timed_out_request_from_queue() {
174185
throttler.register(queued2);
175186

176187
// When
177-
throttler.signalTimeout(queued1);
188+
completeCallback.accept(queued1);
178189

179190
// Then
180191
assertThatStage(queued2.started).isNotDone();

integration-tests/src/test/java/com/datastax/oss/driver/core/throttling/ThrottlingIT.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@
2424
import com.datastax.oss.driver.api.core.RequestThrottlingException;
2525
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2626
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
27+
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
2728
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
2829
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
2930
import com.datastax.oss.driver.categories.ParallelizableTests;
3031
import com.datastax.oss.driver.internal.core.session.throttling.ConcurrencyLimitingRequestThrottler;
3132
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
3233
import com.datastax.oss.simulacron.common.stubbing.PrimeDsl;
34+
import java.util.concurrent.CompletionStage;
3335
import java.util.concurrent.TimeUnit;
36+
import org.junit.Before;
3437
import org.junit.Rule;
3538
import org.junit.Test;
3639
import org.junit.experimental.categories.Category;
@@ -39,21 +42,20 @@
3942
public class ThrottlingIT {
4043

4144
private static final String QUERY = "select * from foo";
45+
private static final int maxConcurrentRequests = 10;
46+
private static final int maxQueueSize = 10;
4247

4348
@Rule public SimulacronRule simulacron = new SimulacronRule(ClusterSpec.builder().withNodes(1));
4449

45-
@Test
46-
public void should_reject_request_when_throttling_by_concurrency() {
50+
private DriverConfigLoader loader = null;
4751

52+
@Before
53+
public void setUp() {
4854
// Add a delay so that requests don't complete during the test
4955
simulacron
5056
.cluster()
5157
.prime(PrimeDsl.when(QUERY).then(PrimeDsl.noRows()).delay(5, TimeUnit.SECONDS));
52-
53-
int maxConcurrentRequests = 10;
54-
int maxQueueSize = 10;
55-
56-
DriverConfigLoader loader =
58+
loader =
5759
SessionUtils.configLoaderBuilder()
5860
.withClass(
5961
DefaultDriverOption.REQUEST_THROTTLER_CLASS,
@@ -63,7 +65,10 @@ public void should_reject_request_when_throttling_by_concurrency() {
6365
maxConcurrentRequests)
6466
.withInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, maxQueueSize)
6567
.build();
68+
}
6669

70+
@Test
71+
public void should_reject_request_when_throttling_by_concurrency() {
6772
try (CqlSession session = SessionUtils.newSession(simulacron, loader)) {
6873

6974
// Saturate the session and fill the queue
@@ -81,4 +86,19 @@ public void should_reject_request_when_throttling_by_concurrency() {
8186
+ "(concurrent requests: 10, queue size: 10)");
8287
}
8388
}
89+
90+
@Test
91+
public void should_propagate_cancel_to_throttler() {
92+
try (CqlSession session = SessionUtils.newSession(simulacron, loader)) {
93+
94+
// Try to saturate the session and fill the queue
95+
for (int i = 0; i < maxConcurrentRequests + maxQueueSize; i++) {
96+
CompletionStage<AsyncResultSet> future = session.executeAsync(QUERY);
97+
future.toCompletableFuture().cancel(true);
98+
}
99+
100+
// The next query should be successful, because the previous queries were cancelled
101+
session.execute(QUERY);
102+
}
103+
}
84104
}

0 commit comments

Comments
 (0)