Skip to content

Commit 8228a70

Browse files
committed
load balancing resilience tests (DE-722)
1 parent 93acce6 commit 8228a70

File tree

5 files changed

+412
-2
lines changed

5 files changed

+412
-2
lines changed

resilience-tests/src/test/java/resilience/ClusterTest.java

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package resilience;
22

33
import com.arangodb.ArangoDB;
4+
import com.arangodb.ArangoDBAsync;
5+
import com.arangodb.Request;
6+
import com.fasterxml.jackson.databind.node.ObjectNode;
47
import resilience.utils.MemoryAppender;
58
import eu.rekawek.toxiproxy.Proxy;
69
import eu.rekawek.toxiproxy.ToxiproxyClient;
@@ -12,6 +15,7 @@
1215
import java.io.IOException;
1316
import java.util.Arrays;
1417
import java.util.List;
18+
import java.util.concurrent.ExecutionException;
1519

1620
@Tag("cluster")
1721
public abstract class ClusterTest {
@@ -34,6 +38,7 @@ static void beforeAll() throws IOException {
3438
p.delete();
3539
}
3640
endpoint.setProxy(client.createProxy(endpoint.getName(), endpoint.getHost() + ":" + endpoint.getPort(), endpoint.getUpstream()));
41+
initServerId(endpoint);
3742
}
3843
}
3944

@@ -63,13 +68,77 @@ protected static ArangoDB.Builder dbBuilder() {
6368
return builder.password(PASSWORD);
6469
}
6570

66-
protected void enableAllEndpoints(){
71+
protected static String serverIdGET(ArangoDB adb) {
72+
return adb.execute(Request.builder()
73+
.method(Request.Method.GET)
74+
.path("/_admin/status")
75+
.build(), ObjectNode.class)
76+
.getBody()
77+
.get("serverInfo")
78+
.get("serverId")
79+
.textValue();
80+
}
81+
82+
protected static String serverIdGET(ArangoDBAsync adb) {
83+
try {
84+
return adb.execute(Request.builder()
85+
.method(Request.Method.GET)
86+
.path("/_admin/status")
87+
.build(), ObjectNode.class)
88+
.get()
89+
.getBody()
90+
.get("serverInfo")
91+
.get("serverId")
92+
.textValue();
93+
} catch (InterruptedException | ExecutionException e) {
94+
throw new RuntimeException(e);
95+
}
96+
}
97+
98+
protected static String serverIdPOST(ArangoDB adb) {
99+
return adb.execute(Request.builder()
100+
.method(Request.Method.POST)
101+
.path("/_admin/status")
102+
.build(), ObjectNode.class)
103+
.getBody()
104+
.get("serverInfo")
105+
.get("serverId")
106+
.textValue();
107+
}
108+
109+
protected static String serverIdPOST(ArangoDBAsync adb) {
110+
try {
111+
return adb.execute(Request.builder()
112+
.method(Request.Method.POST)
113+
.path("/_admin/status")
114+
.build(), ObjectNode.class)
115+
.get()
116+
.getBody()
117+
.get("serverInfo")
118+
.get("serverId")
119+
.textValue();
120+
} catch (InterruptedException | ExecutionException e) {
121+
throw new RuntimeException(e);
122+
}
123+
}
124+
125+
private static void initServerId(Endpoint endpoint) {
126+
ArangoDB adb = new ArangoDB.Builder()
127+
.host(endpoint.getHost(), endpoint.getPort())
128+
.password(PASSWORD)
129+
.build();
130+
String serverId = serverIdGET(adb);
131+
endpoint.setServerId(serverId);
132+
adb.shutdown();
133+
}
134+
135+
protected void enableAllEndpoints() {
67136
for (Endpoint endpoint : endpoints) {
68137
endpoint.enable();
69138
}
70139
}
71140

72-
protected void disableAllEndpoints(){
141+
protected void disableAllEndpoints() {
73142
for (Endpoint endpoint : endpoints) {
74143
endpoint.disable();
75144
}

resilience-tests/src/test/java/resilience/Endpoint.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public class Endpoint {
1313
private final int port;
1414
private final String upstream;
1515
private Proxy proxy;
16+
private String serverId;
1617

1718
public Endpoint(String name, String host, int port, String upstream) {
1819
this.name = name;
@@ -45,6 +46,14 @@ public void setProxy(Proxy proxy) {
4546
this.proxy = proxy;
4647
}
4748

49+
public String getServerId() {
50+
return serverId;
51+
}
52+
53+
public void setServerId(String serverId) {
54+
this.serverId = serverId;
55+
}
56+
4857
public void enable() {
4958
try {
5059
getProxy().enable();
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package resilience.connection;
2+
3+
import com.arangodb.ArangoDB;
4+
import com.arangodb.Protocol;
5+
import com.arangodb.entity.LoadBalancingStrategy;
6+
import org.junit.jupiter.params.ParameterizedTest;
7+
import org.junit.jupiter.params.provider.EnumSource;
8+
import resilience.ClusterTest;
9+
import resilience.Endpoint;
10+
11+
import java.util.HashSet;
12+
import java.util.Set;
13+
import java.util.stream.Collectors;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
17+
public class AcquireHostListTest extends ClusterTest {
18+
19+
@ParameterizedTest(name = "{index}")
20+
@EnumSource(Protocol.class)
21+
void acquireHostList(Protocol protocol) {
22+
ArangoDB adb = new ArangoDB.Builder()
23+
.host("172.28.0.1", 8529)
24+
.password("test")
25+
.acquireHostList(true)
26+
.protocol(protocol)
27+
.loadBalancingStrategy(LoadBalancingStrategy.ROUND_ROBIN)
28+
.build();
29+
30+
Set<String> serverIds = getEndpoints().stream()
31+
.map(Endpoint::getServerId)
32+
.collect(Collectors.toSet());
33+
Set<String> retrievedIds = new HashSet<>();
34+
35+
for (int i = 0; i < serverIds.size(); i++) {
36+
retrievedIds.add(serverIdGET(adb));
37+
}
38+
39+
assertThat(retrievedIds).containsExactlyElementsOf(serverIds);
40+
}
41+
42+
43+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package resilience.loadbalance;
2+
3+
import com.arangodb.ArangoDB;
4+
import com.arangodb.ArangoDBAsync;
5+
import com.arangodb.ArangoDBException;
6+
import com.arangodb.Protocol;
7+
import com.arangodb.entity.LoadBalancingStrategy;
8+
import eu.rekawek.toxiproxy.model.ToxicDirection;
9+
import eu.rekawek.toxiproxy.model.toxic.Latency;
10+
import org.junit.jupiter.api.Disabled;
11+
import org.junit.jupiter.params.ParameterizedTest;
12+
import org.junit.jupiter.params.provider.MethodSource;
13+
import resilience.ClusterTest;
14+
import resilience.Endpoint;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.concurrent.Executors;
19+
import java.util.concurrent.ScheduledExecutorService;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.stream.Stream;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.assertj.core.api.Assertions.catchThrowable;
25+
26+
public class LoadBalanceNoneClusterTest extends ClusterTest {
27+
28+
static Stream<ArangoDB> arangoProvider() {
29+
return Stream.of(
30+
dbBuilder().loadBalancingStrategy(LoadBalancingStrategy.NONE).protocol(Protocol.VST).build(),
31+
dbBuilder().loadBalancingStrategy(LoadBalancingStrategy.NONE).protocol(Protocol.HTTP_VPACK).build(),
32+
dbBuilder().loadBalancingStrategy(LoadBalancingStrategy.NONE).protocol(Protocol.HTTP2_JSON).build()
33+
);
34+
}
35+
36+
static Stream<ArangoDBAsync> asyncArangoProvider() {
37+
return arangoProvider().map(ArangoDB::async);
38+
}
39+
40+
@ParameterizedTest(name = "{index}")
41+
@MethodSource("arangoProvider")
42+
void loadBalancing(ArangoDB arangoDB) {
43+
List<Endpoint> endpoints = getEndpoints();
44+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
45+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
46+
}
47+
48+
@ParameterizedTest(name = "{index}")
49+
@MethodSource("asyncArangoProvider")
50+
void loadBalancingAsync(ArangoDBAsync arangoDB) {
51+
List<Endpoint> endpoints = getEndpoints();
52+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
53+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
54+
}
55+
56+
@ParameterizedTest(name = "{index}")
57+
@MethodSource("arangoProvider")
58+
void failover(ArangoDB arangoDB) throws IOException {
59+
List<Endpoint> endpoints = getEndpoints();
60+
61+
endpoints.get(0).getProxy().disable();
62+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
63+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
64+
enableAllEndpoints();
65+
66+
endpoints.get(1).getProxy().disable();
67+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(2).getServerId());
68+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(2).getServerId());
69+
enableAllEndpoints();
70+
71+
endpoints.get(2).getProxy().disable();
72+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
73+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
74+
enableAllEndpoints();
75+
}
76+
77+
@ParameterizedTest(name = "{index}")
78+
@MethodSource("asyncArangoProvider")
79+
void failoverAsymc(ArangoDBAsync arangoDB) throws IOException {
80+
List<Endpoint> endpoints = getEndpoints();
81+
82+
endpoints.get(0).getProxy().disable();
83+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
84+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
85+
enableAllEndpoints();
86+
87+
endpoints.get(1).getProxy().disable();
88+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(2).getServerId());
89+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(2).getServerId());
90+
enableAllEndpoints();
91+
92+
endpoints.get(2).getProxy().disable();
93+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
94+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
95+
enableAllEndpoints();
96+
}
97+
98+
99+
// FIXME: this fails for VST
100+
@Disabled
101+
@ParameterizedTest(name = "{index}")
102+
@MethodSource("arangoProvider")
103+
void retryGET(ArangoDB arangoDB) throws IOException, InterruptedException {
104+
List<Endpoint> endpoints = getEndpoints();
105+
106+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
107+
108+
// slow down the driver connection
109+
Latency toxic = getEndpoints().get(0).getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000);
110+
Thread.sleep(100);
111+
112+
ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
113+
es.schedule(() -> getEndpoints().get(0).disable(), 300, TimeUnit.MILLISECONDS);
114+
115+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
116+
assertThat(serverIdGET(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
117+
118+
toxic.remove();
119+
enableAllEndpoints();
120+
es.shutdown();
121+
}
122+
123+
124+
@ParameterizedTest(name = "{index}")
125+
@MethodSource("arangoProvider")
126+
void retryPOST(ArangoDB arangoDB) throws IOException, InterruptedException {
127+
List<Endpoint> endpoints = getEndpoints();
128+
129+
assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(0).getServerId());
130+
131+
// slow down the driver connection
132+
Latency toxic = getEndpoints().get(0).getProxy().toxics().latency("latency", ToxicDirection.DOWNSTREAM, 10_000);
133+
Thread.sleep(100);
134+
135+
ScheduledExecutorService es = Executors.newSingleThreadScheduledExecutor();
136+
es.schedule(() -> getEndpoints().get(0).disable(), 300, TimeUnit.MILLISECONDS);
137+
138+
Throwable thrown = catchThrowable(() -> serverIdPOST(arangoDB));
139+
assertThat(thrown).isInstanceOf(ArangoDBException.class);
140+
assertThat(thrown.getCause()).isInstanceOf(IOException.class);
141+
142+
assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
143+
assertThat(serverIdPOST(arangoDB)).isEqualTo(endpoints.get(1).getServerId());
144+
145+
toxic.remove();
146+
enableAllEndpoints();
147+
es.shutdown();
148+
}
149+
150+
}

0 commit comments

Comments
 (0)