Skip to content

Commit a6a2fda

Browse files
bigmontzrobsdedude
andauthored
Reduce number of ROUTE requests when routing table is stale (#1119)
The driver was starting a rediscovery process of each acquireConnection causing more load than needed in the cluster. Keeping track of the ongoing requests and use it when possible reduces the load in the process and speeds up the connection acquisition. Co-authored-by: Robsdedude <dev@rouvenbauer.de>
1 parent 865c085 commit a6a2fda

File tree

5 files changed

+250
-26
lines changed

5 files changed

+250
-26
lines changed

packages/bolt-connection/src/connection-provider/connection-provider-routing.js

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
ConnectionErrorHandler,
2929
DelegateConnection
3030
} from '../connection'
31+
import { functional } from '../lang'
3132

3233
const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error
3334
const {
@@ -97,6 +98,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
9798
? int(routingTablePurgeDelay)
9899
: DEFAULT_ROUTING_TABLE_PURGE_DELAY
99100
)
101+
102+
this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this)
100103
}
101104

102105
_createConnectionErrorHandler () {
@@ -357,10 +360,14 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
357360
this._log.info(
358361
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
359362
)
360-
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, auth)
363+
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, auth)
364+
.then(newRoutingTable => {
365+
onDatabaseNameResolved(newRoutingTable.database)
366+
return newRoutingTable
367+
})
361368
}
362369

363-
_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, auth) {
370+
_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, auth) {
364371
const knownRouters = currentRoutingTable.routers
365372

366373
if (this._useSeedRouter) {
@@ -369,7 +376,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
369376
currentRoutingTable,
370377
bookmarks,
371378
impersonatedUser,
372-
onDatabaseNameResolved,
373379
auth
374380
)
375381
}
@@ -378,7 +384,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
378384
currentRoutingTable,
379385
bookmarks,
380386
impersonatedUser,
381-
onDatabaseNameResolved,
382387
auth
383388
)
384389
}
@@ -388,7 +393,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
388393
currentRoutingTable,
389394
bookmarks,
390395
impersonatedUser,
391-
onDatabaseNameResolved,
392396
auth
393397
) {
394398
// we start with seed router, no routers were probed before
@@ -420,7 +424,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
420424
return await this._applyRoutingTableIfPossible(
421425
currentRoutingTable,
422426
newRoutingTable,
423-
onDatabaseNameResolved,
424427
error
425428
)
426429
}
@@ -430,7 +433,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
430433
currentRoutingTable,
431434
bookmarks,
432435
impersonatedUser,
433-
onDatabaseNameResolved,
434436
auth
435437
) {
436438
let [newRoutingTable, error] = await this._fetchRoutingTableUsingKnownRouters(
@@ -456,7 +458,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
456458
return await this._applyRoutingTableIfPossible(
457459
currentRoutingTable,
458460
newRoutingTable,
459-
onDatabaseNameResolved,
460461
error
461462
)
462463
}
@@ -630,7 +631,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
630631
return [null, error]
631632
}
632633

633-
async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, onDatabaseNameResolved, error) {
634+
async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, error) {
634635
if (!newRoutingTable) {
635636
// none of routing servers returned valid routing table, throw exception
636637
throw newError(
@@ -646,21 +647,19 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
646647
this._useSeedRouter = true
647648
}
648649

649-
await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved)
650+
await this._updateRoutingTable(newRoutingTable)
650651

651652
return newRoutingTable
652653
}
653654

654-
async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved) {
655+
async _updateRoutingTable (newRoutingTable) {
655656
// close old connections to servers not present in the new routing table
656657
await this._connectionPool.keepAll(newRoutingTable.allServers())
657658
this._routingTableRegistry.removeExpired()
658659
this._routingTableRegistry.register(
659660
newRoutingTable
660661
)
661662

662-
onDatabaseNameResolved(newRoutingTable.database)
663-
664663
this._log.info(`Updated routing table ${newRoutingTable}`)
665664
}
666665

packages/bolt-connection/src/lang/functional.js

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
* limitations under the License.
1818
*/
1919

20+
import { json } from 'neo4j-driver-core'
21+
2022
/**
2123
* Identity function.
2224
*
@@ -28,3 +30,29 @@
2830
export function identity (x) {
2931
return x
3032
}
33+
34+
/**
35+
* Makes the function able to share ongoing requests
36+
*
37+
* @param {function(...args): Promise} func The function to be decorated
38+
* @param {any} thisArg The `this` which should be used in the function call
39+
* @return {function(...args): Promise} The decorated function
40+
*/
41+
export function reuseOngoingRequest (func, thisArg = null) {
42+
const ongoingRequests = new Map()
43+
44+
return function (...args) {
45+
const key = json.stringify(args)
46+
if (ongoingRequests.has(key)) {
47+
return ongoingRequests.get(key)
48+
}
49+
50+
const promise = func.apply(thisArg, args)
51+
52+
ongoingRequests.set(key, promise)
53+
54+
return promise.finally(() => {
55+
ongoingRequests.delete(key)
56+
})
57+
}
58+
}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import { reuseOngoingRequest } from '../../src/lang/functional.js'
2+
3+
describe('functional', () => {
4+
describe('reuseOnGoingRequest', () => {
5+
it('should call supplied function with the params', async () => {
6+
const expectedParams = ['a', 1, { a: 'a' }]
7+
const func = jest.fn(() => Promise.resolve())
8+
9+
const decoratedFunction = reuseOngoingRequest(func)
10+
await decoratedFunction(...expectedParams)
11+
12+
expect(func).toHaveBeenCalledWith(...expectedParams)
13+
})
14+
15+
it('should call supplied function with this', async () => {
16+
const expectedParams = ['a', 1, { a: 'a' }]
17+
const thisArg = { t: 'his' }
18+
const func = jest.fn(async function () {
19+
return this
20+
})
21+
22+
const decoratedFunction = reuseOngoingRequest(func, thisArg)
23+
const receivedThis = await decoratedFunction(...expectedParams)
24+
25+
expect(receivedThis).toBe(thisArg)
26+
})
27+
28+
it('should return values returned by the supplied function', async () => {
29+
const expectedResult = { a: 'abc' }
30+
const func = jest.fn(() => Promise.resolve(expectedResult))
31+
32+
const decoratedFunction = reuseOngoingRequest(func)
33+
const result = await decoratedFunction()
34+
35+
expect(result).toBe(expectedResult)
36+
})
37+
38+
it('should throw value thrown by supplied function', async () => {
39+
const error = new Error('Oops, I did it again!')
40+
const func = jest.fn(() => Promise.reject(error))
41+
42+
const decoratedFunction = reuseOngoingRequest(func)
43+
const promise = decoratedFunction()
44+
expect(promise).rejects.toThrow(error)
45+
})
46+
47+
it('should share ongoing request with same params', async () => {
48+
const expectedParams = ['a', 1, [3]]
49+
const expectedResult = { a: 'abc' }
50+
const { promises, func } = mockPromiseFunction()
51+
52+
const decoratedFunction = reuseOngoingRequest(func)
53+
54+
const resultPromises = [
55+
decoratedFunction(...expectedParams),
56+
decoratedFunction(...expectedParams),
57+
decoratedFunction(...expectedParams)
58+
]
59+
60+
expect(func).toBeCalledTimes(1)
61+
expect(promises.length).toBe(1)
62+
63+
promises[0].resolve(expectedResult) // closing ongoing request
64+
65+
const results = await Promise.all(resultPromises)
66+
67+
expect(results).toEqual([expectedResult, expectedResult, expectedResult])
68+
})
69+
70+
it('should not share ongoing request with different params', async () => {
71+
const expectedParams1 = ['a', 1, [3]]
72+
const expectedResult1 = { a: 'abc' }
73+
const expectedParams2 = [4, 'a', []]
74+
const expectedResult2 = { k: 'bbk' }
75+
const { promises, func } = mockPromiseFunction()
76+
77+
const decoratedFunction = reuseOngoingRequest(func)
78+
79+
const resultPromises = [
80+
decoratedFunction(...expectedParams1),
81+
decoratedFunction(...expectedParams2)
82+
]
83+
84+
expect(func).toBeCalledTimes(2)
85+
expect(func).toBeCalledWith(...expectedParams1)
86+
expect(func).toBeCalledWith(...expectedParams2)
87+
88+
expect(promises.length).toBe(2)
89+
90+
promises[0].resolve(expectedResult1) // closing ongoing request 1
91+
promises[1].resolve(expectedResult2) // closing ongoing request 2
92+
93+
const results = await Promise.all(resultPromises)
94+
95+
expect(results).toEqual([expectedResult1, expectedResult2])
96+
})
97+
98+
it('should not share resolved requests with same params', async () => {
99+
const expectedParams = ['a', 1, [3]]
100+
const expectedResult1 = { a: 'abc' }
101+
const expectedResult2 = { k: 'bbk' }
102+
const { promises, func } = mockPromiseFunction()
103+
104+
const decoratedFunction = reuseOngoingRequest(func)
105+
106+
const resultPromises = [
107+
decoratedFunction(...expectedParams)
108+
]
109+
110+
expect(func).toBeCalledTimes(1)
111+
expect(promises.length).toBe(1)
112+
113+
promises[0].resolve(expectedResult1) // closing ongoing request
114+
115+
const results = await Promise.all(resultPromises)
116+
117+
resultPromises.push(decoratedFunction(...expectedParams))
118+
119+
expect(func).toBeCalledTimes(2)
120+
expect(promises.length).toBe(2)
121+
122+
promises[1].resolve(expectedResult2) // closing ongoing request
123+
124+
results.push(await resultPromises[1])
125+
126+
expect(results).toEqual([expectedResult1, expectedResult2])
127+
})
128+
129+
it('should not share rejected requests with same params', async () => {
130+
const expectedParams = ['a', 1, [3]]
131+
const expectedResult1 = new Error('Ops, I did it again!')
132+
const expectedResult2 = { k: 'bbk' }
133+
const { promises, func } = mockPromiseFunction()
134+
135+
const decoratedFunction = reuseOngoingRequest(func)
136+
137+
const resultPromises = [
138+
decoratedFunction(...expectedParams)
139+
]
140+
141+
expect(func).toBeCalledTimes(1)
142+
expect(promises.length).toBe(1)
143+
144+
promises[0].reject(expectedResult1) // closing ongoing request
145+
146+
const results = await Promise.all(
147+
resultPromises.map(promise => promise.catch(error => error))
148+
)
149+
150+
resultPromises.push(decoratedFunction(...expectedParams))
151+
152+
expect(func).toBeCalledTimes(2)
153+
expect(promises.length).toBe(2)
154+
155+
promises[1].resolve(expectedResult2) // closing ongoing request
156+
157+
results.push(await resultPromises[1])
158+
159+
expect(results).toEqual([expectedResult1, expectedResult2])
160+
})
161+
162+
function mockPromiseFunction () {
163+
const promises = []
164+
const func = jest.fn(() => new Promise((resolve, reject) => {
165+
promises.push({ resolve, reject })
166+
}))
167+
return { promises, func }
168+
}
169+
})
170+
})

0 commit comments

Comments
 (0)