Skip to content

Commit a3bd191

Browse files
committed
JAVA-3142: Ability to specify ordering of remote local dc's via new configuration for graceful automatic failovers
patch by Nitin Chhabra; reviewed by Alexandre Dutra, Andy Tolbert, and Bret McGuire for JAVA-3142
1 parent ea2e475 commit a3bd191

File tree

6 files changed

+271
-22
lines changed

6 files changed

+271
-22
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,13 @@ public enum DefaultDriverOption implements DriverOption {
982982
* <p>Value-type: {@link java.time.Duration}
983983
*/
984984
SSL_KEYSTORE_RELOAD_INTERVAL("advanced.ssl-engine-factory.keystore-reload-interval"),
985-
;
985+
/**
986+
* Ordered preference list of remote dcs optionally supplied for automatic failover.
987+
*
988+
* <p>Value type: {@link java.util.List List}&#60;{@link String}&#62;
989+
*/
990+
LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS(
991+
"advanced.load-balancing-policy.dc-failover.preferred-remote-dcs");
986992

987993
private final String path;
988994

core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,8 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
381381
map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC, 0);
382382
map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS, false);
383383
map.put(TypedDriverOption.METRICS_GENERATE_AGGREGABLE_HISTOGRAMS, true);
384+
map.put(
385+
TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS, ImmutableList.of(""));
384386
}
385387

386388
@Immutable

core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,16 @@ public String toString() {
892892
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS,
893893
GenericType.BOOLEAN);
894894

895+
/**
896+
* Ordered preference list of remote dcs optionally supplied for automatic failover and included
897+
* in query plan. This feature is enabled only when max-nodes-per-remote-dc is greater than 0.
898+
*/
899+
public static final TypedDriverOption<List<String>>
900+
LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS =
901+
new TypedDriverOption<>(
902+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS,
903+
GenericType.listOf(String.class));
904+
895905
private static Iterable<TypedDriverOption<?>> introspectBuiltInValues() {
896906
try {
897907
ImmutableList.Builder<TypedDriverOption<?>> result = ImmutableList.builder();

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,14 @@
4545
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
4646
import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan;
4747
import com.datastax.oss.driver.shaded.guava.common.base.Predicates;
48+
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
49+
import com.datastax.oss.driver.shaded.guava.common.collect.Sets;
4850
import edu.umd.cs.findbugs.annotations.NonNull;
4951
import edu.umd.cs.findbugs.annotations.Nullable;
5052
import java.nio.ByteBuffer;
5153
import java.util.Collections;
54+
import java.util.LinkedHashSet;
55+
import java.util.List;
5256
import java.util.Map;
5357
import java.util.Objects;
5458
import java.util.Optional;
@@ -117,6 +121,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
117121
private volatile NodeDistanceEvaluator nodeDistanceEvaluator;
118122
private volatile String localDc;
119123
private volatile NodeSet liveNodes;
124+
private final LinkedHashSet<String> preferredRemoteDcs;
120125

121126
public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
122127
this.context = (InternalDriverContext) context;
@@ -131,6 +136,11 @@ public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String
131136
this.context
132137
.getConsistencyLevelRegistry()
133138
.nameToLevel(profile.getString(DefaultDriverOption.REQUEST_CONSISTENCY));
139+
140+
preferredRemoteDcs =
141+
new LinkedHashSet<>(
142+
profile.getStringList(
143+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS));
134144
}
135145

136146
/**
@@ -320,27 +330,59 @@ protected Queue<Node> maybeAddDcFailover(@Nullable Request request, @NonNull Que
320330
return local;
321331
}
322332
}
323-
QueryPlan remote =
324-
new LazyQueryPlan() {
325-
326-
@Override
327-
protected Object[] computeNodes() {
328-
Object[] remoteNodes =
329-
liveNodes.dcs().stream()
330-
.filter(Predicates.not(Predicates.equalTo(localDc)))
331-
.flatMap(dc -> liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc))
332-
.toArray();
333-
334-
int remoteNodesLength = remoteNodes.length;
335-
if (remoteNodesLength == 0) {
336-
return EMPTY_NODES;
337-
}
338-
shuffleHead(remoteNodes, remoteNodesLength);
339-
return remoteNodes;
340-
}
341-
};
342-
343-
return new CompositeQueryPlan(local, remote);
333+
if (preferredRemoteDcs.isEmpty()) {
334+
return new CompositeQueryPlan(local, buildRemoteQueryPlanAll());
335+
}
336+
return new CompositeQueryPlan(local, buildRemoteQueryPlanPreferred());
337+
}
338+
339+
private QueryPlan buildRemoteQueryPlanAll() {
340+
341+
return new LazyQueryPlan() {
342+
@Override
343+
protected Object[] computeNodes() {
344+
345+
Object[] remoteNodes =
346+
liveNodes.dcs().stream()
347+
.filter(Predicates.not(Predicates.equalTo(localDc)))
348+
.flatMap(dc -> liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc))
349+
.toArray();
350+
if (remoteNodes.length == 0) {
351+
return EMPTY_NODES;
352+
}
353+
shuffleHead(remoteNodes, remoteNodes.length);
354+
return remoteNodes;
355+
}
356+
};
357+
}
358+
359+
private QueryPlan buildRemoteQueryPlanPreferred() {
360+
361+
Set<String> dcs = liveNodes.dcs();
362+
List<String> orderedDcs = Lists.newArrayListWithCapacity(dcs.size());
363+
orderedDcs.addAll(preferredRemoteDcs);
364+
orderedDcs.addAll(Sets.difference(dcs, preferredRemoteDcs));
365+
366+
QueryPlan[] queryPlans =
367+
orderedDcs.stream()
368+
.filter(Predicates.not(Predicates.equalTo(localDc)))
369+
.map(
370+
(dc) -> {
371+
return new LazyQueryPlan() {
372+
@Override
373+
protected Object[] computeNodes() {
374+
Object[] rv = liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc).toArray();
375+
if (rv.length == 0) {
376+
return EMPTY_NODES;
377+
}
378+
shuffleHead(rv, rv.length);
379+
return rv;
380+
}
381+
};
382+
})
383+
.toArray(QueryPlan[]::new);
384+
385+
return new CompositeQueryPlan(queryPlans);
344386
}
345387

346388
/** Exposed as a protected method so that it can be accessed by tests */

core/src/main/resources/reference.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,11 @@ datastax-java-driver {
574574
# Modifiable at runtime: no
575575
# Overridable in a profile: yes
576576
allow-for-local-consistency-levels = false
577+
# Ordered preference list of remote dc's (in order) optionally supplied for automatic failover. While building a query plan, the driver uses the DC's supplied in order together with max-nodes-per-remote-dc
578+
# Required: no
579+
# Modifiable at runtime: no
580+
# Overridable in a profile: no
581+
preferred-remote-dcs = [""]
577582
}
578583
}
579584

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.datastax.oss.driver.internal.core.loadbalancing;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.eq;
23+
import static org.mockito.Mockito.atLeast;
24+
import static org.mockito.Mockito.never;
25+
import static org.mockito.Mockito.spy;
26+
import static org.mockito.Mockito.times;
27+
import static org.mockito.Mockito.verify;
28+
import static org.mockito.Mockito.when;
29+
30+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
31+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
32+
import com.datastax.oss.driver.api.core.metadata.Node;
33+
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
34+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
35+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
36+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
37+
import java.util.Map;
38+
import java.util.UUID;
39+
import org.junit.Test;
40+
import org.mockito.Mock;
41+
42+
public class BasicLoadBalancingPolicyPreferredRemoteDcsTest
43+
extends BasicLoadBalancingPolicyDcFailoverTest {
44+
@Mock protected DefaultNode node10;
45+
@Mock protected DefaultNode node11;
46+
@Mock protected DefaultNode node12;
47+
@Mock protected DefaultNode node13;
48+
@Mock protected DefaultNode node14;
49+
50+
@Override
51+
@Test
52+
public void should_prioritize_single_replica() {
53+
when(request.getRoutingKeyspace()).thenReturn(KEYSPACE);
54+
when(request.getRoutingKey()).thenReturn(ROUTING_KEY);
55+
when(tokenMap.getReplicas(KEYSPACE, ROUTING_KEY)).thenReturn(ImmutableSet.of(node3));
56+
57+
// node3 always first, round-robin on the rest
58+
assertThat(policy.newQueryPlan(request, session))
59+
.containsExactly(
60+
node3, node1, node2, node4, node5, node9, node10, node6, node7, node12, node13);
61+
assertThat(policy.newQueryPlan(request, session))
62+
.containsExactly(
63+
node3, node2, node4, node5, node1, node9, node10, node6, node7, node12, node13);
64+
assertThat(policy.newQueryPlan(request, session))
65+
.containsExactly(
66+
node3, node4, node5, node1, node2, node9, node10, node6, node7, node12, node13);
67+
assertThat(policy.newQueryPlan(request, session))
68+
.containsExactly(
69+
node3, node5, node1, node2, node4, node9, node10, node6, node7, node12, node13);
70+
71+
// Should not shuffle replicas since there is only one
72+
verify(policy, never()).shuffleHead(any(), eq(1));
73+
// But should shuffle remote nodes
74+
verify(policy, times(12)).shuffleHead(any(), eq(2));
75+
}
76+
77+
@Override
78+
@Test
79+
public void should_prioritize_and_shuffle_replicas() {
80+
when(request.getRoutingKeyspace()).thenReturn(KEYSPACE);
81+
when(request.getRoutingKey()).thenReturn(ROUTING_KEY);
82+
when(tokenMap.getReplicas(KEYSPACE, ROUTING_KEY))
83+
.thenReturn(ImmutableSet.of(node1, node2, node3, node6, node9));
84+
85+
// node 6 and 9 being in a remote DC, they don't get a boost for being a replica
86+
assertThat(policy.newQueryPlan(request, session))
87+
.containsExactly(
88+
node1, node2, node3, node4, node5, node9, node10, node6, node7, node12, node13);
89+
assertThat(policy.newQueryPlan(request, session))
90+
.containsExactly(
91+
node1, node2, node3, node5, node4, node9, node10, node6, node7, node12, node13);
92+
93+
// should shuffle replicas
94+
verify(policy, times(2)).shuffleHead(any(), eq(3));
95+
// should shuffle remote nodes
96+
verify(policy, times(6)).shuffleHead(any(), eq(2));
97+
// No power of two choices with only two replicas
98+
verify(session, never()).getPools();
99+
}
100+
101+
@Override
102+
protected void assertRoundRobinQueryPlans() {
103+
for (int i = 0; i < 3; i++) {
104+
assertThat(policy.newQueryPlan(request, session))
105+
.containsExactly(
106+
node1, node2, node3, node4, node5, node9, node10, node6, node7, node12, node13);
107+
assertThat(policy.newQueryPlan(request, session))
108+
.containsExactly(
109+
node2, node3, node4, node5, node1, node9, node10, node6, node7, node12, node13);
110+
assertThat(policy.newQueryPlan(request, session))
111+
.containsExactly(
112+
node3, node4, node5, node1, node2, node9, node10, node6, node7, node12, node13);
113+
assertThat(policy.newQueryPlan(request, session))
114+
.containsExactly(
115+
node4, node5, node1, node2, node3, node9, node10, node6, node7, node12, node13);
116+
assertThat(policy.newQueryPlan(request, session))
117+
.containsExactly(
118+
node5, node1, node2, node3, node4, node9, node10, node6, node7, node12, node13);
119+
}
120+
121+
verify(policy, atLeast(15)).shuffleHead(any(), eq(2));
122+
}
123+
124+
@Override
125+
protected BasicLoadBalancingPolicy createAndInitPolicy() {
126+
when(node4.getDatacenter()).thenReturn("dc1");
127+
when(node5.getDatacenter()).thenReturn("dc1");
128+
when(node6.getDatacenter()).thenReturn("dc2");
129+
when(node7.getDatacenter()).thenReturn("dc2");
130+
when(node8.getDatacenter()).thenReturn("dc2");
131+
when(node9.getDatacenter()).thenReturn("dc3");
132+
when(node10.getDatacenter()).thenReturn("dc3");
133+
when(node11.getDatacenter()).thenReturn("dc3");
134+
when(node12.getDatacenter()).thenReturn("dc4");
135+
when(node13.getDatacenter()).thenReturn("dc4");
136+
when(node14.getDatacenter()).thenReturn("dc4");
137+
138+
// Accept 2 nodes per remote DC
139+
when(defaultProfile.getInt(
140+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC))
141+
.thenReturn(2);
142+
when(defaultProfile.getBoolean(
143+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS))
144+
.thenReturn(false);
145+
146+
when(defaultProfile.getStringList(
147+
DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS))
148+
.thenReturn(ImmutableList.of("dc3", "dc2"));
149+
150+
// Use a subclass to disable shuffling, we just spy to make sure that the shuffling method was
151+
// called (makes tests easier)
152+
BasicLoadBalancingPolicy policy =
153+
spy(
154+
new BasicLoadBalancingPolicy(context, DriverExecutionProfile.DEFAULT_NAME) {
155+
@Override
156+
protected void shuffleHead(Object[] currentNodes, int headLength) {
157+
// nothing (keep in same order)
158+
}
159+
});
160+
Map<UUID, Node> nodes =
161+
ImmutableMap.<UUID, Node>builder()
162+
.put(UUID.randomUUID(), node1)
163+
.put(UUID.randomUUID(), node2)
164+
.put(UUID.randomUUID(), node3)
165+
.put(UUID.randomUUID(), node4)
166+
.put(UUID.randomUUID(), node5)
167+
.put(UUID.randomUUID(), node6)
168+
.put(UUID.randomUUID(), node7)
169+
.put(UUID.randomUUID(), node8)
170+
.put(UUID.randomUUID(), node9)
171+
.put(UUID.randomUUID(), node10)
172+
.put(UUID.randomUUID(), node11)
173+
.put(UUID.randomUUID(), node12)
174+
.put(UUID.randomUUID(), node13)
175+
.put(UUID.randomUUID(), node14)
176+
.build();
177+
policy.init(nodes, distanceReporter);
178+
assertThat(policy.getLiveNodes().dc("dc1")).containsExactly(node1, node2, node3, node4, node5);
179+
assertThat(policy.getLiveNodes().dc("dc2")).containsExactly(node6, node7); // only 2 allowed
180+
assertThat(policy.getLiveNodes().dc("dc3")).containsExactly(node9, node10); // only 2 allowed
181+
assertThat(policy.getLiveNodes().dc("dc4")).containsExactly(node12, node13); // only 2 allowed
182+
return policy;
183+
}
184+
}

0 commit comments

Comments
 (0)