Skip to content

Commit 32ac207

Browse files
bigmontzrobsdedude
andcommitted
Reduce number of ROUTE requests when routing table is stale (#1119)
The driver was starting a rediscovery process of each acquireConnection causing more load than needed in the cluster. Keeping track of the ongoing requests and use it when possible reduces the load in the process and speeds up the connection acquisition. Co-authored-by: Robsdedude <dev@rouvenbauer.de>
1 parent 0436752 commit 32ac207

File tree

4 files changed

+283
-38
lines changed

4 files changed

+283
-38
lines changed

packages/bolt-connection/src/connection-provider/connection-provider-routing.js

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
ConnectionErrorHandler,
2929
DelegateConnection
3030
} from '../connection'
31+
import { functional } from '../lang'
3132

3233
const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error
3334
const {
@@ -85,6 +86,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
8586
? int(routingTablePurgeDelay)
8687
: DEFAULT_ROUTING_TABLE_PURGE_DELAY
8788
)
89+
90+
this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this)
8891
}
8992

9093
_createConnectionErrorHandler () {
@@ -140,7 +143,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
140143
const routingTable = await this._freshRoutingTable({
141144
accessMode,
142145
database: context.database,
143-
bookmark: bookmarks,
146+
bookmarks,
144147
impersonatedUser,
145148
onDatabaseNameResolved: (databaseName) => {
146149
context.database = context.database || databaseName
@@ -259,7 +262,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
259262
return this._connectionPool.acquire(address)
260263
}
261264

262-
_freshRoutingTable ({ accessMode, database, bookmark, impersonatedUser, onDatabaseNameResolved } = {}) {
265+
_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) {
263266
const currentRoutingTable = this._routingTableRegistry.get(
264267
database,
265268
() => new RoutingTable({ database })
@@ -271,44 +274,45 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
271274
this._log.info(
272275
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
273276
)
274-
return this._refreshRoutingTable(currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved)
277+
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser)
278+
.then(newRoutingTable => {
279+
onDatabaseNameResolved(newRoutingTable.database)
280+
return newRoutingTable
281+
})
275282
}
276283

277-
_refreshRoutingTable (currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved) {
284+
_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser) {
278285
const knownRouters = currentRoutingTable.routers
279286

280287
if (this._useSeedRouter) {
281288
return this._fetchRoutingTableFromSeedRouterFallbackToKnownRouters(
282289
knownRouters,
283290
currentRoutingTable,
284-
bookmark,
285-
impersonatedUser,
286-
onDatabaseNameResolved
291+
bookmarks,
292+
impersonatedUser
287293
)
288294
}
289295
return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(
290296
knownRouters,
291297
currentRoutingTable,
292-
bookmark,
293-
impersonatedUser,
294-
onDatabaseNameResolved
298+
bookmarks,
299+
impersonatedUser
295300
)
296301
}
297302

298303
async _fetchRoutingTableFromSeedRouterFallbackToKnownRouters (
299304
knownRouters,
300305
currentRoutingTable,
301-
bookmark,
302-
impersonatedUser,
303-
onDatabaseNameResolved
306+
bookmarks,
307+
impersonatedUser
304308
) {
305309
// we start with seed router, no routers were probed before
306310
const seenRouters = []
307311
let newRoutingTable = await this._fetchRoutingTableUsingSeedRouter(
308312
seenRouters,
309313
this._seedRouter,
310314
currentRoutingTable,
311-
bookmark,
315+
bookmarks,
312316
impersonatedUser
313317
)
314318

@@ -319,29 +323,27 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
319323
newRoutingTable = await this._fetchRoutingTableUsingKnownRouters(
320324
knownRouters,
321325
currentRoutingTable,
322-
bookmark,
326+
bookmarks,
323327
impersonatedUser
324328
)
325329
}
326330

327331
return await this._applyRoutingTableIfPossible(
328332
currentRoutingTable,
329-
newRoutingTable,
330-
onDatabaseNameResolved
333+
newRoutingTable
331334
)
332335
}
333336

334337
async _fetchRoutingTableFromKnownRoutersFallbackToSeedRouter (
335338
knownRouters,
336339
currentRoutingTable,
337-
bookmark,
340+
bookmarks,
338341
impersonatedUser,
339-
onDatabaseNameResolved
340342
) {
341343
let newRoutingTable = await this._fetchRoutingTableUsingKnownRouters(
342344
knownRouters,
343345
currentRoutingTable,
344-
bookmark,
346+
bookmarks,
345347
impersonatedUser
346348
)
347349

@@ -351,28 +353,27 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
351353
knownRouters,
352354
this._seedRouter,
353355
currentRoutingTable,
354-
bookmark,
356+
bookmarks,
355357
impersonatedUser
356358
)
357359
}
358360

359361
return await this._applyRoutingTableIfPossible(
360362
currentRoutingTable,
361-
newRoutingTable,
362-
onDatabaseNameResolved
363+
newRoutingTable
363364
)
364365
}
365366

366367
async _fetchRoutingTableUsingKnownRouters (
367368
knownRouters,
368369
currentRoutingTable,
369-
bookmark,
370+
bookmarks,
370371
impersonatedUser
371372
) {
372373
const newRoutingTable = await this._fetchRoutingTable(
373374
knownRouters,
374375
currentRoutingTable,
375-
bookmark,
376+
bookmarks,
376377
impersonatedUser
377378
)
378379

@@ -397,7 +398,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
397398
seenRouters,
398399
seedRouter,
399400
routingTable,
400-
bookmark,
401+
bookmarks,
401402
impersonatedUser
402403
) {
403404
const resolvedAddresses = await this._resolveSeedRouter(seedRouter)
@@ -407,7 +408,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
407408
address => seenRouters.indexOf(address) < 0
408409
)
409410

410-
return await this._fetchRoutingTable(newAddresses, routingTable, bookmark, impersonatedUser)
411+
return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser)
411412
}
412413

413414
async _resolveSeedRouter (seedRouter) {
@@ -419,7 +420,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
419420
return [].concat.apply([], dnsResolvedAddresses)
420421
}
421422

422-
_fetchRoutingTable (routerAddresses, routingTable, bookmark, impersonatedUser) {
423+
_fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser) {
423424
return routerAddresses.reduce(
424425
async (refreshedTablePromise, currentRouter, currentIndex) => {
425426
const newRoutingTable = await refreshedTablePromise
@@ -441,7 +442,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
441442
// try next router
442443
const session = await this._createSessionForRediscovery(
443444
currentRouter,
444-
bookmark,
445+
bookmarks,
445446
impersonatedUser
446447
)
447448
if (session) {
@@ -474,7 +475,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
474475
)
475476
}
476477

477-
async _createSessionForRediscovery (routerAddress, bookmark, impersonatedUser) {
478+
async _createSessionForRediscovery (routerAddress, bookmarks, impersonatedUser) {
478479
try {
479480
const connection = await this._connectionPool.acquire(routerAddress)
480481

@@ -490,15 +491,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
490491
if (protocolVersion < 4.0) {
491492
return new Session({
492493
mode: WRITE,
493-
bookmark: Bookmark.empty(),
494+
bookmarks: Bookmark.empty(),
494495
connectionProvider
495496
})
496497
}
497498

498499
return new Session({
499500
mode: READ,
500501
database: SYSTEM_DB_NAME,
501-
bookmark,
502+
bookmarks,
502503
connectionProvider,
503504
impersonatedUser
504505
})
@@ -513,7 +514,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
513514
}
514515
}
515516

516-
async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, onDatabaseNameResolved) {
517+
async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable) {
517518
if (!newRoutingTable) {
518519
// none of routing servers returned valid routing table, throw exception
519520
throw newError(
@@ -528,21 +529,18 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
528529
this._useSeedRouter = true
529530
}
530531

531-
await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved)
532+
await this._updateRoutingTable(newRoutingTable)
532533

533534
return newRoutingTable
534535
}
535536

536-
async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved) {
537+
async _updateRoutingTable (newRoutingTable) {
537538
// close old connections to servers not present in the new routing table
538539
await this._connectionPool.keepAll(newRoutingTable.allServers())
539540
this._routingTableRegistry.removeExpired()
540541
this._routingTableRegistry.register(
541542
newRoutingTable
542543
)
543-
544-
onDatabaseNameResolved(newRoutingTable.database)
545-
546544
this._log.info(`Updated routing table ${newRoutingTable}`)
547545
}
548546

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import { json } from 'neo4j-driver-core'
21+
22+
/**
23+
* Identity function.
24+
*
25+
* Identity functions are function which returns the input as output.
26+
*
27+
* @param {any} x
28+
* @returns {any} the x
29+
*/
30+
export function identity (x) {
31+
return x
32+
}
33+
34+
/**
35+
* Makes the function able to share ongoing requests
36+
*
37+
* @param {function(...args): Promise} func The function to be decorated
38+
* @param {any} thisArg The `this` which should be used in the function call
39+
* @return {function(...args): Promise} The decorated function
40+
*/
41+
export function reuseOngoingRequest (func, thisArg = null) {
42+
const ongoingRequests = new Map()
43+
44+
return function (...args) {
45+
const key = json.stringify(args)
46+
if (ongoingRequests.has(key)) {
47+
return ongoingRequests.get(key)
48+
}
49+
50+
const promise = func.apply(thisArg, args)
51+
52+
ongoingRequests.set(key, promise)
53+
54+
return promise.finally(() => {
55+
ongoingRequests.delete(key)
56+
})
57+
}
58+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
export * as functional from './functional'

0 commit comments

Comments
 (0)