Skip to content

Commit 26781c5

Browse files
committed
Add load balancing assertion to the stress test
To make sure both followers and read replicas serve similar amount of read queries.
1 parent 576d9b4 commit 26781c5

File tree

1 file changed

+121
-11
lines changed

1 file changed

+121
-11
lines changed

test/v1/stress.test.js

Lines changed: 121 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import neo4j from '../../src/v1';
2121
import {READ, WRITE} from '../../src/v1/driver';
2222
import parallelLimit from 'async/parallelLimit';
2323
import _ from 'lodash';
24+
import {ServerVersion, VERSION_3_2_0} from '../../src/v1/internal/server-version';
2425
import sharedNeo4j from '../internal/shared-neo4j';
2526

2627
describe('stress tests', () => {
@@ -78,8 +79,8 @@ describe('stress tests', () => {
7879
done.fail(error);
7980
}
8081

81-
verifyServers(context);
82-
verifyNodeCount(context)
82+
verifyServers(context)
83+
.then(() => verifyNodeCount(context))
8384
.then(() => done())
8485
.catch(error => done.fail(error));
8586
});
@@ -245,13 +246,98 @@ describe('stress tests', () => {
245246

246247
function verifyServers(context) {
247248
const routing = DATABASE_URI.indexOf('bolt+routing') === 0;
248-
const seenServers = context.seenServers();
249249

250-
if (routing && seenServers.length <= 1) {
251-
throw new Error(`Routing driver used too few servers: ${seenServers}`);
252-
} else if (!routing && seenServers.length !== 1) {
253-
throw new Error(`Direct driver used too many servers: ${seenServers}`);
250+
if (routing) {
251+
return verifyCausalClusterMembers(context);
254252
}
253+
return verifySingleInstance(context);
254+
}
255+
256+
function verifySingleInstance(context) {
257+
return new Promise(resolve => {
258+
const readServerAddresses = context.readServerAddresses();
259+
const writeServerAddresses = context.writeServerAddresses();
260+
261+
expect(readServerAddresses.length).toEqual(1);
262+
expect(writeServerAddresses.length).toEqual(1);
263+
expect(readServerAddresses).toEqual(writeServerAddresses);
264+
265+
const address = readServerAddresses[0];
266+
expect(context.readServersWithQueryCount[address]).toBeGreaterThan(1);
267+
expect(context.writeServersWithQueryCount[address]).toBeGreaterThan(1);
268+
269+
resolve();
270+
});
271+
}
272+
273+
function verifyCausalClusterMembers(context) {
274+
return fetchClusterAddresses(context).then(clusterAddresses => {
275+
// before 3.2.0 only read replicas serve reads
276+
const readsOnFollowersEnabled = context.serverVersion.compareTo(VERSION_3_2_0) >= 0;
277+
278+
if (readsOnFollowersEnabled) {
279+
// expect all followers to serve more than zero read queries
280+
assertAllAddressesServedReadQueries(clusterAddresses.followers, context.readServersWithQueryCount);
281+
}
282+
283+
// expect all read replicas to serve more than zero read queries
284+
assertAllAddressesServedReadQueries(clusterAddresses.readReplicas, context.readServersWithQueryCount);
285+
286+
if (readsOnFollowersEnabled) {
287+
// expect all followers to serve same order of magnitude read queries
288+
assertAllAddressesServedSimilarAmountOfReadQueries(clusterAddresses.followers, context.readServersWithQueryCount);
289+
}
290+
291+
// expect all read replicas to serve same order of magnitude read queries
292+
assertAllAddressesServedSimilarAmountOfReadQueries(clusterAddresses.readReplicas,
293+
context.readServersWithQueryCount);
294+
});
295+
}
296+
297+
function fetchClusterAddresses(context) {
298+
const session = context.driver.session();
299+
return session.run('CALL dbms.cluster.overview()').then(result => {
300+
session.close();
301+
const records = result.records;
302+
303+
const followers = addressesWithRole(records, 'FOLLOWER');
304+
const readReplicas = addressesWithRole(records, 'READ_REPLICA');
305+
306+
return new ClusterAddresses(followers, readReplicas);
307+
});
308+
}
309+
310+
function addressesWithRole(records, role) {
311+
return _.uniq(records.filter(record => record.get('role') === role)
312+
.map(record => record.get('addresses')[0].replace('bolt://', '')));
313+
}
314+
315+
function assertAllAddressesServedReadQueries(addresses, readQueriesByServer) {
316+
addresses.forEach(address => {
317+
const queries = readQueriesByServer[address];
318+
expect(queries).toBeGreaterThan(0);
319+
});
320+
}
321+
322+
function assertAllAddressesServedSimilarAmountOfReadQueries(addresses, readQueriesByServer) {
323+
const expectedOrderOfMagnitude = orderOfMagnitude(readQueriesByServer[addresses[0]]);
324+
325+
addresses.forEach(address => {
326+
const queries = readQueriesByServer[address];
327+
const currentOrderOfMagnitude = orderOfMagnitude(queries);
328+
329+
expect(currentOrderOfMagnitude).not.toBeLessThan(expectedOrderOfMagnitude - 1);
330+
expect(currentOrderOfMagnitude).not.toBeGreaterThan(expectedOrderOfMagnitude + 1);
331+
});
332+
}
333+
334+
function orderOfMagnitude(number) {
335+
let result = 1;
336+
while (number >= 10) {
337+
number /= 10;
338+
result++;
339+
}
340+
return result;
255341
}
256342

257343
function randomParams() {
@@ -310,25 +396,41 @@ describe('stress tests', () => {
310396
this.createdNodesCount = 0;
311397
this._commandIdCouter = 0;
312398
this._loggingEnabled = loggingEnabled;
313-
this._seenServers = new Set();
399+
this.readServersWithQueryCount = {};
400+
this.writeServersWithQueryCount = {};
401+
this.serverVersion = null;
314402
}
315403

316404
queryCompleted(result, accessMode, bookmark) {
405+
const serverInfo = result.summary.server;
406+
407+
if (!this.serverVersion) {
408+
this.serverVersion = ServerVersion.fromString(serverInfo.version);
409+
}
410+
411+
const serverAddress = serverInfo.address;
317412
if (accessMode === WRITE) {
318413
this.createdNodesCount++;
414+
this.writeServersWithQueryCount[serverAddress] = (this.writeServersWithQueryCount[serverAddress] || 0) + 1;
415+
} else {
416+
this.readServersWithQueryCount[serverAddress] = (this.readServersWithQueryCount[serverAddress] || 0) + 1;
319417
}
418+
320419
if (bookmark) {
321420
this.bookmark = bookmark;
322421
}
323-
this._seenServers.add(result.summary.server.address);
324422
}
325423

326424
nextCommandId() {
327425
return this._commandIdCouter++;
328426
}
329427

330-
seenServers() {
331-
return Array.from(this._seenServers);
428+
readServerAddresses() {
429+
return Object.keys(this.readServersWithQueryCount);
430+
}
431+
432+
writeServerAddresses() {
433+
return Object.keys(this.writeServersWithQueryCount);
332434
}
333435

334436
log(commandId, message) {
@@ -338,4 +440,12 @@ describe('stress tests', () => {
338440
}
339441
}
340442

443+
class ClusterAddresses {
444+
445+
constructor(followers, readReplicas) {
446+
this.followers = followers;
447+
this.readReplicas = readReplicas;
448+
}
449+
}
450+
341451
});

0 commit comments

Comments
 (0)