@@ -23,7 +23,8 @@ import {
23
23
error ,
24
24
Integer ,
25
25
int ,
26
- internal
26
+ internal ,
27
+ ServerInfo
27
28
} from 'neo4j-driver-core'
28
29
import { RoutingTable } from '../../src/rediscovery/'
29
30
import { Pool } from '../../src/pool'
@@ -2474,6 +2475,234 @@ describe('#unit RoutingConnectionProvider', () => {
2474
2475
} )
2475
2476
2476
2477
} )
2478
+
2479
+ describe . each ( [
2480
+ [ undefined , READ ] ,
2481
+ [ undefined , WRITE ] ,
2482
+ [ '' , READ ] ,
2483
+ [ '' , WRITE ] ,
2484
+ [ 'databaseA' , READ ] ,
2485
+ [ 'databaseA' , WRITE ] ,
2486
+ ] ) ( '.verifyConnectivityAndGetServeInfo({ database: %s, accessMode: %s })' , ( database , accessMode ) => {
2487
+ describe ( 'when connection is available in the pool' , ( ) => {
2488
+ it ( 'should return the server info' , async ( ) => {
2489
+ const { connectionProvider, server, protocolVersion } = setup ( )
2490
+
2491
+ const serverInfo = await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2492
+
2493
+ expect ( serverInfo ) . toEqual ( new ServerInfo ( server , protocolVersion ) )
2494
+ } )
2495
+
2496
+ it ( 'should acquire, resetAndFlush and release connections for sever with the selected access mode' , async ( ) => {
2497
+ const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup ( )
2498
+ const acquireSpy = jest . spyOn ( pool , 'acquire' )
2499
+
2500
+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2501
+
2502
+ const targetServers = accessMode === WRITE ? routingTable . writers : routingTable . readers
2503
+ for ( const address of targetServers ) {
2504
+ expect ( acquireSpy ) . toHaveBeenCalledWith ( address )
2505
+
2506
+ const connections = seenConnectionsPerAddress . get ( address )
2507
+
2508
+ expect ( connections . length ) . toBe ( 1 )
2509
+ expect ( connections [ 0 ] . resetAndFlush ) . toHaveBeenCalled ( )
2510
+ expect ( connections [ 0 ] . _release ) . toHaveBeenCalled ( )
2511
+ expect ( connections [ 0 ] . _release . mock . invocationCallOrder [ 0 ] )
2512
+ . toBeGreaterThan ( connections [ 0 ] . resetAndFlush . mock . invocationCallOrder [ 0 ] )
2513
+ }
2514
+ } )
2515
+
2516
+ it ( 'should not acquire, resetAndFlush and release connections for sever with the other access mode' , async ( ) => {
2517
+ const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup ( )
2518
+ const acquireSpy = jest . spyOn ( pool , 'acquire' )
2519
+
2520
+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2521
+
2522
+ const targetServers = accessMode === WRITE ? routingTable . readers : routingTable . writers
2523
+ for ( const address of targetServers ) {
2524
+ expect ( acquireSpy ) . not . toHaveBeenCalledWith ( address )
2525
+ expect ( seenConnectionsPerAddress . get ( address ) ) . toBeUndefined ( )
2526
+ }
2527
+ } )
2528
+
2529
+ describe ( 'when the reset and flush fails for at least one the address' , ( ) => {
2530
+ it ( 'should fails with the reset and flush error' , async ( ) => {
2531
+ const error = newError ( 'Error' )
2532
+ let i = 0
2533
+ const resetAndFlush = jest . fn ( ( ) => i ++ % 2 == 0 ? Promise . reject ( error ) : Promise . resolve ( ) )
2534
+ const { connectionProvider } = setup ( { resetAndFlush } )
2535
+
2536
+ try {
2537
+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2538
+ expect ( ) . toBe ( 'Not reached' )
2539
+ } catch ( e ) {
2540
+ expect ( e ) . toBe ( error )
2541
+ }
2542
+ } )
2543
+
2544
+ it ( 'should release the connection' , async ( ) => {
2545
+ const error = newError ( 'Error' )
2546
+ let i = 0
2547
+ const resetAndFlush = jest . fn ( ( ) => i ++ % 2 == 0 ? Promise . reject ( error ) : Promise . resolve ( ) )
2548
+ const { connectionProvider, seenConnectionsPerAddress, routingTable } = setup ( { resetAndFlush } )
2549
+
2550
+ try {
2551
+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2552
+ } catch ( e ) {
2553
+ } finally {
2554
+ const targetServers = accessMode === WRITE ? routingTable . writers : routingTable . readers
2555
+ for ( const address of targetServers ) {
2556
+ const connections = seenConnectionsPerAddress . get ( address )
2557
+
2558
+ expect ( connections . length ) . toBe ( 1 )
2559
+ expect ( connections [ 0 ] . resetAndFlush ) . toHaveBeenCalled ( )
2560
+ expect ( connections [ 0 ] . _release ) . toHaveBeenCalled ( )
2561
+ }
2562
+ }
2563
+ } )
2564
+
2565
+ describe ( 'and the release fails' , ( ) => {
2566
+ it ( 'should fails with the release error' , async ( ) => {
2567
+ const error = newError ( 'Error' )
2568
+ const releaseError = newError ( 'Release error' )
2569
+ let i = 0
2570
+ const resetAndFlush = jest . fn ( ( ) => i ++ % 2 == 0 ? Promise . reject ( error ) : Promise . resolve ( ) )
2571
+ const releaseMock = jest . fn ( ( ) => Promise . reject ( releaseError ) )
2572
+ const { connectionProvider } = setup ( { resetAndFlush, releaseMock } )
2573
+
2574
+ try {
2575
+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2576
+ expect ( ) . toBe ( 'Not reached' )
2577
+ } catch ( e ) {
2578
+ expect ( e ) . toBe ( releaseError )
2579
+ }
2580
+ } )
2581
+ } )
2582
+
2583
+ } )
2584
+
2585
+ describe ( 'when the release for at least one the address' , ( ) => {
2586
+ it ( 'should fails with the reset and flush error' , async ( ) => {
2587
+ const error = newError ( 'Error' )
2588
+ let i = 0
2589
+ const releaseMock = jest . fn ( ( ) => i ++ % 2 == 0 ? Promise . reject ( error ) : Promise . resolve ( ) )
2590
+ const { connectionProvider } = setup ( { releaseMock } )
2591
+
2592
+ try {
2593
+ await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2594
+ expect ( ) . toBe ( 'Not reached' )
2595
+ } catch ( e ) {
2596
+ expect ( e ) . toBe ( error )
2597
+ }
2598
+ } )
2599
+ } )
2600
+
2601
+ function setup ( { resetAndFlush, releaseMock } = { } ) {
2602
+ const routingTable = newRoutingTable (
2603
+ database || null ,
2604
+ [ server1 , server2 ] ,
2605
+ [ server3 , server4 ] ,
2606
+ [ server5 , server6 ]
2607
+ )
2608
+ const protocolVersion = 4.4
2609
+ const server = { address : 'localhost:123' , version : 'neo4j/1234' }
2610
+
2611
+ const seenConnectionsPerAddress = new Map ( )
2612
+
2613
+ const pool = newPool ( {
2614
+ create : ( address , release ) => {
2615
+ if ( ! seenConnectionsPerAddress . has ( address ) ) {
2616
+ seenConnectionsPerAddress . set ( address , [ ] )
2617
+ }
2618
+ const connection = new FakeConnection ( address , release , 'version' , protocolVersion , server )
2619
+ if ( resetAndFlush ) {
2620
+ connection . resetAndFlush = resetAndFlush
2621
+ }
2622
+ if ( releaseMock ) {
2623
+ connection . _release = releaseMock
2624
+ }
2625
+ seenConnectionsPerAddress . get ( address ) . push ( connection )
2626
+ return connection
2627
+ }
2628
+ } )
2629
+ const connectionProvider = newRoutingConnectionProvider (
2630
+ [
2631
+ routingTable
2632
+ ] ,
2633
+ pool
2634
+ )
2635
+ return { connectionProvider, routingTable, seenConnectionsPerAddress, server, protocolVersion, pool }
2636
+ }
2637
+ } )
2638
+
2639
+ describe ( 'when at least the one of the servers is not available' , ( ) => {
2640
+ it ( 'should reject with acquistion timeout error' , async ( ) => {
2641
+ const routingTable = newRoutingTable (
2642
+ database || null ,
2643
+ [ server1 , server2 ] ,
2644
+ [ server3 , server4 ] ,
2645
+ [ server5 , server6 ]
2646
+ )
2647
+
2648
+ const pool = newPool ( {
2649
+ config : {
2650
+ acquisitionTimeout : 0 ,
2651
+ }
2652
+ } )
2653
+
2654
+ const connectionProvider = newRoutingConnectionProvider (
2655
+ [
2656
+ routingTable
2657
+ ] ,
2658
+ pool
2659
+ )
2660
+
2661
+ try {
2662
+ connectionProvider = await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2663
+ expect ( ) . toBe ( 'not reached' )
2664
+ } catch ( e ) {
2665
+ expect ( e ) . toBeDefined ( )
2666
+ }
2667
+ } )
2668
+ } )
2669
+
2670
+ describe ( 'when at least the one of the connections could not be created' , ( ) => {
2671
+ it ( 'should reject with acquistion timeout error' , async ( ) => {
2672
+ let i = 0
2673
+ const error = new Error ( 'Connection creation error' )
2674
+ const routingTable = newRoutingTable (
2675
+ database || null ,
2676
+ [ server1 , server2 ] ,
2677
+ [ server3 , server4 ] ,
2678
+ [ server5 , server6 ]
2679
+ )
2680
+
2681
+ const pool = newPool ( {
2682
+ create : ( address , release ) => {
2683
+ if ( i ++ % 2 === 0 ) {
2684
+ return new FakeConnection ( address , release , 'version' , 4.4 , { } )
2685
+ }
2686
+ throw error
2687
+ }
2688
+ } )
2689
+
2690
+ const connectionProvider = newRoutingConnectionProvider (
2691
+ [
2692
+ routingTable
2693
+ ] ,
2694
+ pool
2695
+ )
2696
+
2697
+ try {
2698
+ connectionProvider = await connectionProvider . verifyConnectivityAndGetServerInfo ( { database, accessMode } )
2699
+ expect ( ) . toBe ( 'not reached' )
2700
+ } catch ( e ) {
2701
+ expect ( e ) . toBe ( error )
2702
+ }
2703
+ } )
2704
+ } )
2705
+ } )
2477
2706
} )
2478
2707
2479
2708
function newRoutingConnectionProvider (
@@ -2567,10 +2796,20 @@ function setupRoutingConnectionProviderToRememberRouters (
2567
2796
connectionProvider . _fetchRoutingTable = rememberingFetch
2568
2797
}
2569
2798
2570
- function newPool ( ) {
2799
+ function newPool ( { create, config } = { } ) {
2800
+ const _create = ( address , release ) => {
2801
+ if ( create ) {
2802
+ try {
2803
+ return Promise . resolve ( create ( address , release ) )
2804
+ } catch ( e ) {
2805
+ return Promise . reject ( e )
2806
+ }
2807
+ }
2808
+ return Promise . resolve ( new FakeConnection ( address , release , 'version' , 4.0 ) )
2809
+ }
2571
2810
return new Pool ( {
2572
- create : ( address , release ) =>
2573
- Promise . resolve ( new FakeConnection ( address , release , 'version' , 4.0 ) )
2811
+ config ,
2812
+ create : ( address , release ) => _create ( address , release ) ,
2574
2813
} )
2575
2814
}
2576
2815
@@ -2605,13 +2844,16 @@ function expectPoolToNotContain (pool, addresses) {
2605
2844
}
2606
2845
2607
2846
class FakeConnection extends Connection {
2608
- constructor ( address , release , version , protocolVersion ) {
2847
+ constructor ( address , release , version , protocolVersion , server ) {
2609
2848
super ( null )
2610
2849
2611
2850
this . _address = address
2612
2851
this . _version = version || VERSION_IN_DEV . toString ( )
2613
2852
this . _protocolVersion = protocolVersion
2614
2853
this . release = release
2854
+ this . _release = jest . fn ( ( ) => release ( address , this ) )
2855
+ this . resetAndFlush = jest . fn ( ( ) => Promise . resolve ( ) )
2856
+ this . _server = server
2615
2857
}
2616
2858
2617
2859
get address ( ) {
@@ -2622,6 +2864,10 @@ class FakeConnection extends Connection {
2622
2864
return this . _version
2623
2865
}
2624
2866
2867
+ get server ( ) {
2868
+ return this . _server
2869
+ }
2870
+
2625
2871
protocol ( ) {
2626
2872
return {
2627
2873
version : this . _protocolVersion
0 commit comments