Skip to content

Commit 5f8472b

Browse files
committed
Switch AtomicBoolean to volatile for closed flag
1 parent 9b6a232 commit 5f8472b

File tree

1 file changed

+7
-8
lines changed

1 file changed

+7
-8
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import edu.umd.cs.findbugs.annotations.Nullable;
2929
import java.util.Deque;
3030
import java.util.concurrent.ConcurrentLinkedDeque;
31-
import java.util.concurrent.atomic.AtomicBoolean;
3231
import java.util.concurrent.atomic.AtomicInteger;
3332
import net.jcip.annotations.ThreadSafe;
3433
import org.slf4j.Logger;
@@ -64,7 +63,7 @@ public class ConcurrencyLimitingRequestThrottler implements RequestThrottler {
6463
private final AtomicInteger concurrentRequests = new AtomicInteger(0);
6564
private final Deque<Throttled> queue = new ConcurrentLinkedDeque<>();
6665
private final AtomicInteger queueSize = new AtomicInteger(0);
67-
private final AtomicBoolean closed = new AtomicBoolean(false);
66+
private volatile boolean closed = false;
6867

6968
public ConcurrencyLimitingRequestThrottler(DriverContext context) {
7069
this.logPrefix = context.getSessionName();
@@ -81,7 +80,7 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) {
8180

8281
@Override
8382
public void register(@NonNull Throttled request) {
84-
if (closed.get()) {
83+
if (closed) {
8584
LOG.trace("[{}] Rejecting request after shutdown", logPrefix);
8685
fail(request, "The session is shutting down");
8786
return;
@@ -114,7 +113,7 @@ public void register(@NonNull Throttled request) {
114113
// Double-check that we were still supposed to be enqueued; it is possible
115114
// that the session was closed while we were enqueuing, it's also possible
116115
// that it is right now removing the request, so we need to check both
117-
if (closed.get()) {
116+
if (closed) {
118117
if (queue.remove(request)) {
119118
queueSize.decrementAndGet();
120119
LOG.trace("[{}] Rejecting late request after shutdown", logPrefix);
@@ -148,7 +147,7 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) {
148147
@Override
149148
public void signalTimeout(@NonNull Throttled request) {
150149
Throttled nextRequest = null;
151-
if (!closed.get()) {
150+
if (!closed) {
152151
if (queue.remove(request)) { // The request timed out before it was active
153152
queueSize.decrementAndGet();
154153
LOG.trace("[{}] Removing timed out request from the queue", logPrefix);
@@ -165,7 +164,7 @@ public void signalTimeout(@NonNull Throttled request) {
165164
@Override
166165
public void signalCancel(@NonNull Throttled request) {
167166
Throttled nextRequest = null;
168-
if (!closed.get()) {
167+
if (!closed) {
169168
if (queue.remove(request)) { // The request has been cancelled before it was active
170169
queueSize.decrementAndGet();
171170
LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
@@ -181,7 +180,7 @@ public void signalCancel(@NonNull Throttled request) {
181180

182181
@Nullable
183182
private Throttled onRequestDoneAndDequeNext() {
184-
if (!closed.get()) {
183+
if (!closed) {
185184
Throttled nextRequest = queue.poll();
186185
if (nextRequest == null) {
187186
concurrentRequests.decrementAndGet();
@@ -198,7 +197,7 @@ private Throttled onRequestDoneAndDequeNext() {
198197

199198
@Override
200199
public void close() {
201-
closed.set(true);
200+
closed = true;
202201

203202
LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, queueSize.get());
204203
Throttled request;

0 commit comments

Comments
 (0)