@@ -63,7 +63,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
63
63
} )
64
64
65
65
this . _seedRouter = address
66
- this . _routingTables = { }
67
66
this . _rediscovery = new Rediscovery ( routingContext , address . toString ( ) )
68
67
this . _loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy (
69
68
this . _connectionPool
@@ -72,9 +71,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
72
71
this . _dnsResolver = new HostNameResolver ( )
73
72
this . _log = log
74
73
this . _useSeedRouter = true
75
- this . _routingTablePurgeDelay = routingTablePurgeDelay
76
- ? int ( routingTablePurgeDelay )
77
- : DEFAULT_ROUTING_TABLE_PURGE_DELAY
74
+ this . _routingTableRegistry = new RoutingTableRegistry (
75
+ routingTablePurgeDelay
76
+ ? int ( routingTablePurgeDelay )
77
+ : DEFAULT_ROUTING_TABLE_PURGE_DELAY
78
+ )
78
79
}
79
80
80
81
_createConnectionErrorHandler ( ) {
@@ -87,15 +88,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
87
88
this . _log . warn (
88
89
`Routing driver ${ this . _id } will forget ${ address } for database '${ database } ' because of an error ${ error . code } '${ error . message } '`
89
90
)
90
- this . forget ( address , database || '' )
91
+ this . forget ( address , database || DEFAULT_DB_NAME )
91
92
return error
92
93
}
93
94
94
95
_handleWriteFailure ( error , address , database ) {
95
96
this . _log . warn (
96
97
`Routing driver ${ this . _id } will forget writer ${ address } for database '${ database } ' because of an error ${ error . code } '${ error . message } '`
97
98
)
98
- this . forgetWriter ( address , database || '' )
99
+ this . forgetWriter ( address , database || DEFAULT_DB_NAME )
99
100
return newError (
100
101
'No longer possible to write to server at ' + address ,
101
102
SESSION_EXPIRED
@@ -206,36 +207,40 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
206
207
}
207
208
208
209
forget ( address , database ) {
209
- if ( database || database === '' ) {
210
- this . _routingTables [ database ] . forget ( address )
211
- } else {
212
- Object . values ( this . _routingTables ) . forEach ( routingTable =>
213
- routingTable . forget ( address )
214
- )
215
- }
210
+ this . _routingTableRegistry . apply ( database , {
211
+ applyWhenExists : routingTable => routingTable . forget ( address ) ,
212
+ applyWhenDontExists : ( ) => {
213
+ throw newError (
214
+ 'Could not forget address for a non existing routing table'
215
+ )
216
+ }
217
+ } )
216
218
217
219
// We're firing and forgetting this operation explicitly and listening for any
218
220
// errors to avoid unhandled promise rejection
219
221
this . _connectionPool . purge ( address ) . catch ( ( ) => { } )
220
222
}
221
223
222
224
forgetWriter ( address , database ) {
223
- if ( database || database === '' ) {
224
- this . _routingTables [ database ] . forgetWriter ( address )
225
- } else {
226
- Object . values ( this . _routingTables ) . forEach ( routingTable =>
227
- routingTable . forgetWriter ( address )
228
- )
229
- }
225
+ this . _routingTableRegistry . apply ( database , {
226
+ applyWhenExists : routingTable => routingTable . forgetWriter ( address ) ,
227
+ applyWhenDontExists : ( ) => {
228
+ throw newError (
229
+ 'Could not forget writer address for a non existing routing table'
230
+ )
231
+ }
232
+ } )
230
233
}
231
234
232
235
_acquireConnectionToServer ( address , serverName , routingTable ) {
233
236
return this . _connectionPool . acquire ( address )
234
237
}
235
238
236
239
_freshRoutingTable ( { accessMode, database, bookmark } = { } ) {
237
- const currentRoutingTable =
238
- this . _routingTables [ database ] || new RoutingTable ( { database } )
240
+ const currentRoutingTable = this . _routingTableRegistry . get (
241
+ database ,
242
+ ( ) => new RoutingTable ( { database } )
243
+ )
239
244
240
245
if ( ! currentRoutingTable . isStaleFor ( accessMode ) ) {
241
246
return currentRoutingTable
@@ -481,16 +486,11 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
481
486
async _updateRoutingTable ( newRoutingTable ) {
482
487
// close old connections to servers not present in the new routing table
483
488
await this . _connectionPool . keepAll ( newRoutingTable . allServers ( ) )
484
-
485
- // filter out expired to purge (expired for a pre-configured amount of time) routing table entries
486
- Object . values ( this . _routingTables ) . forEach ( value => {
487
- if ( value . isExpiredFor ( this . _routingTablePurgeDelay ) ) {
488
- delete this . _routingTables [ value . database ]
489
- }
490
- } )
491
-
492
- // make this driver instance aware of the new table
493
- this . _routingTables [ newRoutingTable . database ] = newRoutingTable
489
+ this . _routingTableRegistry . removeExpired ( )
490
+ this . _routingTableRegistry . register (
491
+ newRoutingTable . database ,
492
+ newRoutingTable
493
+ )
494
494
this . _log . info ( `Updated routing table ${ newRoutingTable } ` )
495
495
}
496
496
@@ -501,3 +501,96 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
501
501
}
502
502
}
503
503
}
504
+
505
+ /**
506
+ * Responsible for keeping track of the existing routing tables
507
+ */
508
+ class RoutingTableRegistry {
509
+ /**
510
+ * Constructor
511
+ * @param {int } routingTablePurgeDelay The routing table purge delay
512
+ */
513
+ constructor ( routingTablePurgeDelay ) {
514
+ this . _tables = new Map ( )
515
+ this . _routingTablePurgeDelay = routingTablePurgeDelay
516
+ }
517
+
518
+ /**
519
+ * Put a routing table in the registry
520
+ *
521
+ * @param {string } database The database name
522
+ * @param {RoutingTable } table The routing table
523
+ * @returns {RoutingTableRegistry } this
524
+ */
525
+ register ( database , table ) {
526
+ this . _tables . set ( database , table )
527
+ return this
528
+ }
529
+
530
+ /**
531
+ * Apply function in the routing table for an specific database. If the database name is not defined, the function will
532
+ * be applied for each element
533
+ *
534
+ * @param {string } database The database name
535
+ * @param {object } callbacks The actions
536
+ * @param {function (RoutingTable) } callbacks.applyWhenExists Call when the db exists or when the database property is not informed
537
+ * @param {function () } callbacks.applyWhenDontExists Call when the database doesn't have the routing table registred
538
+ * @returns {RoutingTableRegistry } this
539
+ */
540
+ apply ( database , { applyWhenExists, applyWhenDontExists = ( ) => { } } = { } ) {
541
+ if ( this . _tables . has ( database ) ) {
542
+ applyWhenExists ( this . _tables . get ( database ) )
543
+ } else if ( typeof database === 'string' || database === null ) {
544
+ applyWhenDontExists ( )
545
+ } else {
546
+ this . _forEach ( applyWhenExists )
547
+ }
548
+ return this
549
+ }
550
+
551
+ /**
552
+ * Retrieves a routing table from a given database name
553
+ * @param {string } database The database name
554
+ * @param {function()|RoutingTable } defaultSupplier The routing table supplier, if it's not a function or not exists, it will return itself as default value
555
+ * @returns {RoutingTable } The routing table for the respective database
556
+ */
557
+ get ( database , defaultSupplier ) {
558
+ if ( this . _tables . has ( database ) ) {
559
+ return this . _tables . get ( database )
560
+ }
561
+ return typeof defaultSupplier === 'function'
562
+ ? defaultSupplier ( )
563
+ : defaultSupplier
564
+ }
565
+
566
+ /**
567
+ * Remove the routing table which is already expired
568
+ * @returns {RoutingTableRegistry } this
569
+ */
570
+ removeExpired ( ) {
571
+ return this . _removeIf ( value =>
572
+ value . isExpiredFor ( this . _routingTablePurgeDelay )
573
+ )
574
+ }
575
+
576
+ _forEach ( apply ) {
577
+ for ( const [ , value ] of this . _tables ) {
578
+ apply ( value )
579
+ }
580
+ return this
581
+ }
582
+
583
+ _remove ( key ) {
584
+ this . _tables . delete ( key )
585
+ return this
586
+ }
587
+
588
+ _removeIf ( predicate ) {
589
+ for ( const [ key , value ] of this . _tables ) {
590
+ if ( predicate ( value ) ) {
591
+ this . _remove ( key )
592
+ }
593
+ }
594
+ return this
595
+ }
596
+ }
0 commit comments