@@ -63,7 +63,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
63
63
} )
64
64
65
65
this . _seedRouter = address
66
- this . _routingTables = { }
67
66
this . _rediscovery = new Rediscovery ( routingContext , address . toString ( ) )
68
67
this . _loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy (
69
68
this . _connectionPool
@@ -72,9 +71,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
72
71
this . _dnsResolver = new HostNameResolver ( )
73
72
this . _log = log
74
73
this . _useSeedRouter = true
75
- this . _routingTablePurgeDelay = routingTablePurgeDelay
76
- ? int ( routingTablePurgeDelay )
77
- : DEFAULT_ROUTING_TABLE_PURGE_DELAY
74
+ this . _routingTableRegistry = new RoutingTableRegistry (
75
+ routingTablePurgeDelay
76
+ ? int ( routingTablePurgeDelay )
77
+ : DEFAULT_ROUTING_TABLE_PURGE_DELAY
78
+ )
78
79
}
79
80
80
81
_createConnectionErrorHandler ( ) {
@@ -87,15 +88,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
87
88
this . _log . warn (
88
89
`Routing driver ${ this . _id } will forget ${ address } for database '${ database } ' because of an error ${ error . code } '${ error . message } '`
89
90
)
90
- this . forget ( address , database || '' )
91
+ this . forget ( address , database || DEFAULT_DB_NAME )
91
92
return error
92
93
}
93
94
94
95
_handleWriteFailure ( error , address , database ) {
95
96
this . _log . warn (
96
97
`Routing driver ${ this . _id } will forget writer ${ address } for database '${ database } ' because of an error ${ error . code } '${ error . message } '`
97
98
)
98
- this . forgetWriter ( address , database || '' )
99
+ this . forgetWriter ( address , database || DEFAULT_DB_NAME )
99
100
return newError (
100
101
'No longer possible to write to server at ' + address ,
101
102
SESSION_EXPIRED
@@ -206,36 +207,30 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
206
207
}
207
208
208
209
forget ( address , database ) {
209
- if ( database || database === '' ) {
210
- this . _routingTables [ database ] . forget ( address )
211
- } else {
212
- Object . values ( this . _routingTables ) . forEach ( routingTable =>
213
- routingTable . forget ( address )
214
- )
215
- }
210
+ this . _routingTableRegistry . apply ( database , {
211
+ applyWhenExists : routingTable => routingTable . forget ( address )
212
+ } )
216
213
217
214
// We're firing and forgetting this operation explicitly and listening for any
218
215
// errors to avoid unhandled promise rejection
219
216
this . _connectionPool . purge ( address ) . catch ( ( ) => { } )
220
217
}
221
218
222
219
forgetWriter ( address , database ) {
223
- if ( database || database === '' ) {
224
- this . _routingTables [ database ] . forgetWriter ( address )
225
- } else {
226
- Object . values ( this . _routingTables ) . forEach ( routingTable =>
227
- routingTable . forgetWriter ( address )
228
- )
229
- }
220
+ this . _routingTableRegistry . apply ( database , {
221
+ applyWhenExists : routingTable => routingTable . forgetWriter ( address )
222
+ } )
230
223
}
231
224
232
225
_acquireConnectionToServer ( address , serverName , routingTable ) {
233
226
return this . _connectionPool . acquire ( address )
234
227
}
235
228
236
229
_freshRoutingTable ( { accessMode, database, bookmark } = { } ) {
237
- const currentRoutingTable =
238
- this . _routingTables [ database ] || new RoutingTable ( { database } )
230
+ const currentRoutingTable = this . _routingTableRegistry . get (
231
+ database ,
232
+ ( ) => new RoutingTable ( { database } )
233
+ )
239
234
240
235
if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
241
236
return currentRoutingTable
@@ -481,16 +476,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
481
476
async _updateRoutingTable ( newRoutingTable ) {
482
477
// close old connections to servers not present in the new routing table
483
478
await this . _connectionPool . keepAll ( newRoutingTable . allServers ( ) )
484
-
485
- // filter out expired to purge (expired for a pre-configured amount of time) routing table entries
486
- Object . values ( this . _routingTables ) . forEach ( value => {
487
- if ( value . isExpiredFor ( this . _routingTablePurgeDelay ) ) {
488
- delete this . _routingTables [ value . database ]
489
- }
490
- } )
491
-
492
- // make this driver instance aware of the new table
493
- this . _routingTables [ newRoutingTable . database ] = newRoutingTable
479
+ this . _routingTableRegistry . removeExpired ( )
480
+ this . _routingTableRegistry . register (
481
+ newRoutingTable . database ,
482
+ newRoutingTable
483
+ )
494
484
this . _log . info ( `Updated routing table ${ newRoutingTable } ` )
495
485
}
496
486
@@ -501,3 +491,96 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
501
491
}
502
492
}
503
493
}
494
+
495
+ /**
496
+ * Responsible for keeping track of the existing routing tables
497
+ */
498
+ class RoutingTableRegistry {
499
+ /**
500
+ * Constructor
501
+ * @param {int } routingTablePurgeDelay The routing table purge delay
502
+ */
503
+ constructor ( routingTablePurgeDelay ) {
504
+ this . _tables = new Map ( )
505
+ this . _routingTablePurgeDelay = routingTablePurgeDelay
506
+ }
507
+
508
+ /**
509
+ * Put a routing table in the registry
510
+ *
511
+ * @param {string } database The database name
512
+ * @param {RoutingTable } table The routing table
513
+ * @returns {RoutingTableRegistry } this
514
+ */
515
+ register ( database , table ) {
516
+ this . _tables . set ( database , table )
517
+ return this
518
+ }
519
+
520
+ /**
521
+ * Apply function in the routing table for an specific database. If the database name is not defined, the function will
522
+ * be applied for each element
523
+ *
524
+ * @param {string } database The database name
525
+ * @param {object } callbacks The actions
526
+ * @param {function (RoutingTable) } callbacks.applyWhenExists Call when the db exists or when the database property is not informed
527
+ * @param {function () } callbacks.applyWhenDontExists Call when the database doesn't have the routing table registred
528
+ * @returns {RoutingTableRegistry } this
529
+ */
530
+ apply ( database , { applyWhenExists, applyWhenDontExists = ( ) => { } } = { } ) {
531
+ if ( this . _tables . has ( database ) ) {
532
+ applyWhenExists ( this . _tables . get ( database ) )
533
+ } else if ( typeof database === 'string' || database === null ) {
534
+ applyWhenDontExists ( )
535
+ } else {
536
+ this . _forEach ( applyWhenExists )
537
+ }
538
+ return this
539
+ }
540
+
541
+ /**
542
+ * Retrieves a routing table from a given database name
543
+ * @param {string } database The database name
544
+ * @param {function()|RoutingTable } defaultSupplier The routing table supplier, if it's not a function or not exists, it will return itself as default value
545
+ * @returns {RoutingTable } The routing table for the respective database
546
+ */
547
+ get ( database , defaultSupplier ) {
548
+ if ( this . _tables . has ( database ) ) {
549
+ return this . _tables . get ( database )
550
+ }
551
+ return typeof defaultSupplier === 'function'
552
+ ? defaultSupplier ( )
553
+ : defaultSupplier
554
+ }
555
+
556
+ /**
557
+ * Remove the routing table which is already expired
558
+ * @returns {RoutingTableRegistry } this
559
+ */
560
+ removeExpired ( ) {
561
+ return this . _removeIf ( value =>
562
+ value . isExpiredFor ( this . _routingTablePurgeDelay )
563
+ )
564
+ }
565
+
566
+ _forEach ( apply ) {
567
+ for ( const [ , value ] of this . _tables ) {
568
+ apply ( value )
569
+ }
570
+ return this
571
+ }
572
+
573
+ _remove ( key ) {
574
+ this . _tables . delete ( key )
575
+ return this
576
+ }
577
+
578
+ _removeIf ( predicate ) {
579
+ for ( const [ key , value ] of this . _tables ) {
580
+ if ( predicate ( value ) ) {
581
+ this . _remove ( key )
582
+ }
583
+ }
584
+ return this
585
+ }
586
+ }
0 commit comments