@@ -21,6 +21,7 @@ import neo4j from '../../src/v1';
21
21
import { READ , WRITE } from '../../src/v1/driver' ;
22
22
import parallelLimit from 'async/parallelLimit' ;
23
23
import _ from 'lodash' ;
24
+ import { ServerVersion , VERSION_3_2_0 } from '../../src/v1/internal/server-version' ;
24
25
import sharedNeo4j from '../internal/shared-neo4j' ;
25
26
26
27
describe ( 'stress tests' , ( ) => {
@@ -78,8 +79,8 @@ describe('stress tests', () => {
78
79
done . fail ( error ) ;
79
80
}
80
81
81
- verifyServers ( context ) ;
82
- verifyNodeCount ( context )
82
+ verifyServers ( context )
83
+ . then ( ( ) => verifyNodeCount ( context ) )
83
84
. then ( ( ) => done ( ) )
84
85
. catch ( error => done . fail ( error ) ) ;
85
86
} ) ;
@@ -245,13 +246,98 @@ describe('stress tests', () => {
245
246
246
247
function verifyServers ( context ) {
247
248
const routing = DATABASE_URI . indexOf ( 'bolt+routing' ) === 0 ;
248
- const seenServers = context . seenServers ( ) ;
249
249
250
- if ( routing && seenServers . length <= 1 ) {
251
- throw new Error ( `Routing driver used too few servers: ${ seenServers } ` ) ;
252
- } else if ( ! routing && seenServers . length !== 1 ) {
253
- throw new Error ( `Direct driver used too many servers: ${ seenServers } ` ) ;
250
+ if ( routing ) {
251
+ return verifyCausalClusterMembers ( context ) ;
254
252
}
253
+ return verifySingleInstance ( context ) ;
254
+ }
255
+
256
+ function verifySingleInstance ( context ) {
257
+ return new Promise ( resolve => {
258
+ const readServerAddresses = context . readServerAddresses ( ) ;
259
+ const writeServerAddresses = context . writeServerAddresses ( ) ;
260
+
261
+ expect ( readServerAddresses . length ) . toEqual ( 1 ) ;
262
+ expect ( writeServerAddresses . length ) . toEqual ( 1 ) ;
263
+ expect ( readServerAddresses ) . toEqual ( writeServerAddresses ) ;
264
+
265
+ const address = readServerAddresses [ 0 ] ;
266
+ expect ( context . readServersWithQueryCount [ address ] ) . toBeGreaterThan ( 1 ) ;
267
+ expect ( context . writeServersWithQueryCount [ address ] ) . toBeGreaterThan ( 1 ) ;
268
+
269
+ resolve ( ) ;
270
+ } ) ;
271
+ }
272
+
273
+ function verifyCausalClusterMembers ( context ) {
274
+ return fetchClusterAddresses ( context ) . then ( clusterAddresses => {
275
+ // before 3.2.0 only read replicas serve reads
276
+ const readsOnFollowersEnabled = context . serverVersion . compareTo ( VERSION_3_2_0 ) >= 0 ;
277
+
278
+ if ( readsOnFollowersEnabled ) {
279
+ // expect all followers to serve more than zero read queries
280
+ assertAllAddressesServedReadQueries ( clusterAddresses . followers , context . readServersWithQueryCount ) ;
281
+ }
282
+
283
+ // expect all read replicas to serve more than zero read queries
284
+ assertAllAddressesServedReadQueries ( clusterAddresses . readReplicas , context . readServersWithQueryCount ) ;
285
+
286
+ if ( readsOnFollowersEnabled ) {
287
+ // expect all followers to serve same order of magnitude read queries
288
+ assertAllAddressesServedSimilarAmountOfReadQueries ( clusterAddresses . followers , context . readServersWithQueryCount ) ;
289
+ }
290
+
291
+ // expect all read replicas to serve same order of magnitude read queries
292
+ assertAllAddressesServedSimilarAmountOfReadQueries ( clusterAddresses . readReplicas ,
293
+ context . readServersWithQueryCount ) ;
294
+ } ) ;
295
+ }
296
+
297
+ function fetchClusterAddresses ( context ) {
298
+ const session = context . driver . session ( ) ;
299
+ return session . run ( 'CALL dbms.cluster.overview()' ) . then ( result => {
300
+ session . close ( ) ;
301
+ const records = result . records ;
302
+
303
+ const followers = addressesWithRole ( records , 'FOLLOWER' ) ;
304
+ const readReplicas = addressesWithRole ( records , 'READ_REPLICA' ) ;
305
+
306
+ return new ClusterAddresses ( followers , readReplicas ) ;
307
+ } ) ;
308
+ }
309
+
310
+ function addressesWithRole ( records , role ) {
311
+ return _ . uniq ( records . filter ( record => record . get ( 'role' ) === role )
312
+ . map ( record => record . get ( 'addresses' ) [ 0 ] . replace ( 'bolt://' , '' ) ) ) ;
313
+ }
314
+
315
+ function assertAllAddressesServedReadQueries ( addresses , readQueriesByServer ) {
316
+ addresses . forEach ( address => {
317
+ const queries = readQueriesByServer [ address ] ;
318
+ expect ( queries ) . toBeGreaterThan ( 0 ) ;
319
+ } ) ;
320
+ }
321
+
322
+ function assertAllAddressesServedSimilarAmountOfReadQueries ( addresses , readQueriesByServer ) {
323
+ const expectedOrderOfMagnitude = orderOfMagnitude ( readQueriesByServer [ addresses [ 0 ] ] ) ;
324
+
325
+ addresses . forEach ( address => {
326
+ const queries = readQueriesByServer [ address ] ;
327
+ const currentOrderOfMagnitude = orderOfMagnitude ( queries ) ;
328
+
329
+ expect ( currentOrderOfMagnitude ) . not . toBeLessThan ( expectedOrderOfMagnitude - 1 ) ;
330
+ expect ( currentOrderOfMagnitude ) . not . toBeGreaterThan ( expectedOrderOfMagnitude + 1 ) ;
331
+ } ) ;
332
+ }
333
+
334
+ function orderOfMagnitude ( number ) {
335
+ let result = 1 ;
336
+ while ( number >= 10 ) {
337
+ number /= 10 ;
338
+ result ++ ;
339
+ }
340
+ return result ;
255
341
}
256
342
257
343
function randomParams ( ) {
@@ -310,25 +396,41 @@ describe('stress tests', () => {
310
396
this . createdNodesCount = 0 ;
311
397
this . _commandIdCouter = 0 ;
312
398
this . _loggingEnabled = loggingEnabled ;
313
- this . _seenServers = new Set ( ) ;
399
+ this . readServersWithQueryCount = { } ;
400
+ this . writeServersWithQueryCount = { } ;
401
+ this . serverVersion = null ;
314
402
}
315
403
316
404
queryCompleted ( result , accessMode , bookmark ) {
405
+ const serverInfo = result . summary . server ;
406
+
407
+ if ( ! this . serverVersion ) {
408
+ this . serverVersion = ServerVersion . fromString ( serverInfo . version ) ;
409
+ }
410
+
411
+ const serverAddress = serverInfo . address ;
317
412
if ( accessMode === WRITE ) {
318
413
this . createdNodesCount ++ ;
414
+ this . writeServersWithQueryCount [ serverAddress ] = ( this . writeServersWithQueryCount [ serverAddress ] || 0 ) + 1 ;
415
+ } else {
416
+ this . readServersWithQueryCount [ serverAddress ] = ( this . readServersWithQueryCount [ serverAddress ] || 0 ) + 1 ;
319
417
}
418
+
320
419
if ( bookmark ) {
321
420
this . bookmark = bookmark ;
322
421
}
323
- this . _seenServers . add ( result . summary . server . address ) ;
324
422
}
325
423
326
424
nextCommandId ( ) {
327
425
return this . _commandIdCouter ++ ;
328
426
}
329
427
330
- seenServers ( ) {
331
- return Array . from ( this . _seenServers ) ;
428
+ readServerAddresses ( ) {
429
+ return Object . keys ( this . readServersWithQueryCount ) ;
430
+ }
431
+
432
+ writeServerAddresses ( ) {
433
+ return Object . keys ( this . writeServersWithQueryCount ) ;
332
434
}
333
435
334
436
log ( commandId , message ) {
@@ -338,4 +440,12 @@ describe('stress tests', () => {
338
440
}
339
441
}
340
442
443
+ class ClusterAddresses {
444
+
445
+ constructor ( followers , readReplicas ) {
446
+ this . followers = followers ;
447
+ this . readReplicas = readReplicas ;
448
+ }
449
+ }
450
+
341
451
} ) ;
0 commit comments