Skip to content

Commit 7877837

Browse files
SiyaoIsHidingabsurdfarce
authored andcommitted
JAVA-3051: Memory leak
patch by Jane He; reviewed by Alexandre Dutra and Bret McGuire for JAVA-3051
1 parent dfe11a8 commit 7877837

File tree

8 files changed

+126
-84
lines changed

8 files changed

+126
-84
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ private class SingleThreaded {
253253
private final Reconnection reconnection;
254254
private DriverChannelOptions channelOptions;
255255
// The last events received for each node
256-
private final Map<Node, DistanceEvent> lastDistanceEvents = new WeakHashMap<>();
257-
private final Map<Node, NodeStateEvent> lastStateEvents = new WeakHashMap<>();
256+
private final Map<Node, NodeDistance> lastNodeDistance = new WeakHashMap<>();
257+
private final Map<Node, NodeState> lastNodeState = new WeakHashMap<>();
258258

259259
private SingleThreaded(InternalDriverContext context) {
260260
this.context = context;
@@ -366,8 +366,8 @@ private void connect(
366366
.whenCompleteAsync(
367367
(channel, error) -> {
368368
try {
369-
DistanceEvent lastDistanceEvent = lastDistanceEvents.get(node);
370-
NodeStateEvent lastStateEvent = lastStateEvents.get(node);
369+
NodeDistance lastDistance = lastNodeDistance.get(node);
370+
NodeState lastState = lastNodeState.get(node);
371371
if (error != null) {
372372
if (closeWasCalled || initFuture.isCancelled()) {
373373
onSuccess.run(); // abort, we don't really care about the result
@@ -406,18 +406,17 @@ private void connect(
406406
channel);
407407
channel.forceClose();
408408
onSuccess.run();
409-
} else if (lastDistanceEvent != null
410-
&& lastDistanceEvent.distance == NodeDistance.IGNORED) {
409+
} else if (lastDistance == NodeDistance.IGNORED) {
411410
LOG.debug(
412411
"[{}] New channel opened ({}) but node became ignored, "
413412
+ "closing and trying next node",
414413
logPrefix,
415414
channel);
416415
channel.forceClose();
417416
connect(nodes, errors, onSuccess, onFailure);
418-
} else if (lastStateEvent != null
419-
&& (lastStateEvent.newState == null /*(removed)*/
420-
|| lastStateEvent.newState == NodeState.FORCED_DOWN)) {
417+
} else if (lastNodeState.containsKey(node)
418+
&& (lastState == null /*(removed)*/
419+
|| lastState == NodeState.FORCED_DOWN)) {
421420
LOG.debug(
422421
"[{}] New channel opened ({}) but node was removed or forced down, "
423422
+ "closing and trying next node",
@@ -534,7 +533,7 @@ private void reconnectNow() {
534533

535534
private void onDistanceEvent(DistanceEvent event) {
536535
assert adminExecutor.inEventLoop();
537-
this.lastDistanceEvents.put(event.node, event);
536+
this.lastNodeDistance.put(event.node, event.distance);
538537
if (event.distance == NodeDistance.IGNORED
539538
&& channel != null
540539
&& !channel.closeFuture().isDone()
@@ -549,7 +548,7 @@ private void onDistanceEvent(DistanceEvent event) {
549548

550549
private void onStateEvent(NodeStateEvent event) {
551550
assert adminExecutor.inEventLoop();
552-
this.lastStateEvents.put(event.node, event);
551+
this.lastNodeState.put(event.node, event.newState);
553552
if ((event.newState == null /*(removed)*/ || event.newState == NodeState.FORCED_DOWN)
554553
&& channel != null
555554
&& !channel.closeFuture().isDone()

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,19 @@
3333
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
3434
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
3535
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
36+
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
37+
import com.datastax.oss.driver.shaded.guava.common.collect.MapMaker;
3638
import edu.umd.cs.findbugs.annotations.NonNull;
3739
import edu.umd.cs.findbugs.annotations.Nullable;
3840
import java.util.BitSet;
3941
import java.util.Map;
4042
import java.util.Optional;
43+
import java.util.OptionalLong;
4144
import java.util.Queue;
4245
import java.util.Set;
4346
import java.util.UUID;
4447
import java.util.concurrent.ConcurrentHashMap;
48+
import java.util.concurrent.ConcurrentMap;
4549
import java.util.concurrent.ThreadLocalRandom;
4650
import java.util.concurrent.atomic.AtomicLongArray;
4751
import net.jcip.annotations.ThreadSafe;
@@ -96,14 +100,15 @@ public class DefaultLoadBalancingPolicy extends BasicLoadBalancingPolicy impleme
96100
private static final int MAX_IN_FLIGHT_THRESHOLD = 10;
97101
private static final long RESPONSE_COUNT_RESET_INTERVAL_NANOS = MILLISECONDS.toNanos(200);
98102

99-
protected final Map<Node, AtomicLongArray> responseTimes = new ConcurrentHashMap<>();
103+
protected final ConcurrentMap<Node, NodeResponseRateSample> responseTimes;
100104
protected final Map<Node, Long> upTimes = new ConcurrentHashMap<>();
101105
private final boolean avoidSlowReplicas;
102106

103107
public DefaultLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
104108
super(context, profileName);
105109
this.avoidSlowReplicas =
106110
profile.getBoolean(DefaultDriverOption.LOAD_BALANCING_POLICY_SLOW_AVOIDANCE, true);
111+
this.responseTimes = new MapMaker().weakKeys().makeMap();
107112
}
108113

109114
@NonNull
@@ -274,40 +279,19 @@ protected boolean isBusy(@NonNull Node node, @NonNull Session session) {
274279
}
275280

276281
protected boolean isResponseRateInsufficient(@NonNull Node node, long now) {
277-
// response rate is considered insufficient when less than 2 responses were obtained in
278-
// the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
279-
if (responseTimes.containsKey(node)) {
280-
AtomicLongArray array = responseTimes.get(node);
281-
if (array.length() == 2) {
282-
long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
283-
long leastRecent = array.get(0);
284-
return leastRecent - threshold < 0;
285-
}
286-
}
287-
return true;
282+
NodeResponseRateSample sample = responseTimes.get(node);
283+
return !(sample == null || sample.hasSufficientResponses(now));
288284
}
289285

286+
/**
287+
* Synchronously updates the response times for the given node. It is synchronous because the
288+
* {@link #DefaultLoadBalancingPolicy(com.datastax.oss.driver.api.core.context.DriverContext,
289+
* java.lang.String) CacheLoader.load} assigned is synchronous.
290+
*
291+
* @param node The node to update.
292+
*/
290293
protected void updateResponseTimes(@NonNull Node node) {
291-
responseTimes.compute(
292-
node,
293-
(n, array) -> {
294-
// The array stores at most two timestamps, since we don't need more;
295-
// the first one is always the least recent one, and hence the one to inspect.
296-
long now = nanoTime();
297-
if (array == null) {
298-
array = new AtomicLongArray(1);
299-
array.set(0, now);
300-
} else if (array.length() == 1) {
301-
long previous = array.get(0);
302-
array = new AtomicLongArray(2);
303-
array.set(0, previous);
304-
array.set(1, now);
305-
} else {
306-
array.set(0, array.get(1));
307-
array.set(1, now);
308-
}
309-
return array;
310-
});
294+
this.responseTimes.compute(node, (k, v) -> v == null ? new NodeResponseRateSample() : v.next());
311295
}
312296

313297
protected int getInFlight(@NonNull Node node, @NonNull Session session) {
@@ -318,4 +302,52 @@ protected int getInFlight(@NonNull Node node, @NonNull Session session) {
318302
// processing them).
319303
return (pool == null) ? 0 : pool.getInFlight();
320304
}
305+
306+
protected class NodeResponseRateSample {
307+
308+
@VisibleForTesting protected final long oldest;
309+
@VisibleForTesting protected final OptionalLong newest;
310+
311+
private NodeResponseRateSample() {
312+
long now = nanoTime();
313+
this.oldest = now;
314+
this.newest = OptionalLong.empty();
315+
}
316+
317+
private NodeResponseRateSample(long oldestSample) {
318+
this(oldestSample, nanoTime());
319+
}
320+
321+
private NodeResponseRateSample(long oldestSample, long newestSample) {
322+
this.oldest = oldestSample;
323+
this.newest = OptionalLong.of(newestSample);
324+
}
325+
326+
@VisibleForTesting
327+
protected NodeResponseRateSample(AtomicLongArray times) {
328+
assert times.length() >= 1;
329+
this.oldest = times.get(0);
330+
this.newest = (times.length() > 1) ? OptionalLong.of(times.get(1)) : OptionalLong.empty();
331+
}
332+
333+
// Our newest sample becomes the oldest in the next generation
334+
private NodeResponseRateSample next() {
335+
return new NodeResponseRateSample(this.getNewestValidSample(), nanoTime());
336+
}
337+
338+
// If we have a pair of values return the newest, otherwise we have just one value... so just
339+
// return it
340+
private long getNewestValidSample() {
341+
return this.newest.orElse(this.oldest);
342+
}
343+
344+
// response rate is considered insufficient when less than 2 responses were obtained in
345+
// the past interval delimited by RESPONSE_COUNT_RESET_INTERVAL_NANOS.
346+
private boolean hasSufficientResponses(long now) {
347+
// If we only have one sample it's an automatic failure
348+
if (!this.newest.isPresent()) return true;
349+
long threshold = now - RESPONSE_COUNT_RESET_INTERVAL_NANOS;
350+
return this.oldest - threshold >= 0;
351+
}
352+
}
321353
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.Map;
3939
import java.util.Queue;
4040
import java.util.Set;
41+
import java.util.WeakHashMap;
4142
import java.util.concurrent.ConcurrentLinkedQueue;
4243
import java.util.concurrent.atomic.AtomicReference;
4344
import java.util.concurrent.locks.Lock;
@@ -105,7 +106,7 @@ public LoadBalancingPolicyWrapper(
105106
// Just an alias to make the rest of the code more readable
106107
this.policies = reporters.keySet();
107108

108-
this.distances = new HashMap<>();
109+
this.distances = new WeakHashMap<>();
109110

110111
this.logPrefix = context.getSessionName();
111112
context.getEventBus().register(NodeStateEvent.class, this::onNodeStateEvent);
@@ -172,6 +173,7 @@ private void onNodeStateEvent(NodeStateEvent event) {
172173

173174
// once it has gone through the filter
174175
private void processNodeStateEvent(NodeStateEvent event) {
176+
DefaultNode node = event.node;
175177
switch (stateRef.get()) {
176178
case BEFORE_INIT:
177179
case DURING_INIT:
@@ -181,13 +183,13 @@ private void processNodeStateEvent(NodeStateEvent event) {
181183
case RUNNING:
182184
for (LoadBalancingPolicy policy : policies) {
183185
if (event.newState == NodeState.UP) {
184-
policy.onUp(event.node);
186+
policy.onUp(node);
185187
} else if (event.newState == NodeState.DOWN || event.newState == NodeState.FORCED_DOWN) {
186-
policy.onDown(event.node);
188+
policy.onDown(node);
187189
} else if (event.newState == NodeState.UNKNOWN) {
188-
policy.onAdd(event.node);
190+
policy.onAdd(node);
189191
} else if (event.newState == null) {
190-
policy.onRemove(event.node);
192+
policy.onRemove(node);
191193
} else {
192194
LOG.warn("[{}] Unsupported event: {}", logPrefix, event);
193195
}

core/src/main/java/com/datastax/oss/driver/internal/core/metrics/AbstractMetricUpdater.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,8 @@ protected Timeout newTimeout() {
173173
.getTimer()
174174
.newTimeout(
175175
t -> {
176-
if (t.isExpired()) {
177-
clearMetrics();
178-
}
176+
clearMetrics();
177+
cancelMetricsExpirationTimeout();
179178
},
180179
expireAfter.toNanos(),
181180
TimeUnit.NANOSECONDS);

core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -527,14 +527,18 @@ private void notifyListeners() {
527527

528528
private void onNodeStateChanged(NodeStateEvent event) {
529529
assert adminExecutor.inEventLoop();
530-
if (event.newState == null) {
531-
context.getNodeStateListener().onRemove(event.node);
530+
DefaultNode node = event.node;
531+
if (node == null) {
532+
LOG.debug(
533+
"[{}] Node for this event was removed, ignoring state change: {}", logPrefix, event);
534+
} else if (event.newState == null) {
535+
context.getNodeStateListener().onRemove(node);
532536
} else if (event.oldState == null && event.newState == NodeState.UNKNOWN) {
533-
context.getNodeStateListener().onAdd(event.node);
537+
context.getNodeStateListener().onAdd(node);
534538
} else if (event.newState == NodeState.UP) {
535-
context.getNodeStateListener().onUp(event.node);
539+
context.getNodeStateListener().onUp(node);
536540
} else if (event.newState == NodeState.DOWN || event.newState == NodeState.FORCED_DOWN) {
537-
context.getNodeStateListener().onDown(event.node);
541+
context.getNodeStateListener().onDown(node);
538542
}
539543
}
540544

core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/ReplayingEventFilter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public void markReady() {
8282
consumer.accept(event);
8383
}
8484
} finally {
85+
recordedEvents.clear();
8586
stateLock.writeLock().unlock();
8687
}
8788
}

core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicyQueryPlanTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,10 @@ public void should_prioritize_and_shuffle_3_or_more_replicas_when_first_unhealth
203203
given(pool3.getInFlight()).willReturn(0);
204204
given(pool5.getInFlight()).willReturn(0);
205205

206-
dsePolicy.responseTimes.put(node1, new AtomicLongArray(new long[] {T0, T0})); // unhealthy
206+
dsePolicy.responseTimes.put(
207+
node1,
208+
dsePolicy
209+
.new NodeResponseRateSample(new AtomicLongArray(new long[] {T0, T0}))); // unhealthy
207210

208211
// When
209212
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);
@@ -232,7 +235,9 @@ public void should_prioritize_and_shuffle_3_or_more_replicas_when_first_unhealth
232235
given(pool3.getInFlight()).willReturn(0);
233236
given(pool5.getInFlight()).willReturn(0);
234237

235-
dsePolicy.responseTimes.put(node1, new AtomicLongArray(new long[] {T1, T1})); // healthy
238+
dsePolicy.responseTimes.put(
239+
node1,
240+
dsePolicy.new NodeResponseRateSample(new AtomicLongArray(new long[] {T1, T1}))); // healthy
236241

237242
// When
238243
Queue<Node> plan1 = dsePolicy.newQueryPlan(request, session);

0 commit comments

Comments
 (0)