From fcf08e76c8ef922e069f21069a2eb8b851ead7a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Thu, 3 Aug 2023 13:29:40 +0200 Subject: [PATCH] Reduce number of `ROUTE` requests when routing table is stale (#1119) The driver was starting a rediscovery process of each acquireConnection causing more load than needed in the cluster. Keeping track of the ongoing requests and use it when possible reduces the load in the process and speeds up the connection acquisition. Co-authored-by: Robsdedude --- .../connection-provider-routing.js | 72 ++++---- .../bolt-connection/src/lang/functional.js | 58 ++++++ packages/bolt-connection/src/lang/index.js | 19 ++ .../test/lang/functional.test.js | 170 ++++++++++++++++++ 4 files changed, 282 insertions(+), 37 deletions(-) create mode 100644 packages/bolt-connection/src/lang/functional.js create mode 100644 packages/bolt-connection/src/lang/index.js create mode 100644 packages/bolt-connection/test/lang/functional.test.js diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index 0dbe88b26..df81ee870 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -28,6 +28,7 @@ import { ConnectionErrorHandler, DelegateConnection } from '../connection' +import { functional } from '../lang' const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error const { @@ -85,6 +86,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ? int(routingTablePurgeDelay) : DEFAULT_ROUTING_TABLE_PURGE_DELAY ) + + this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this) } _createConnectionErrorHandler () { @@ -140,7 +143,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider const routingTable = await this._freshRoutingTable({ accessMode, database: context.database, - bookmark: bookmarks, + bookmarks, impersonatedUser, onDatabaseNameResolved: (databaseName) => { context.database = context.database || databaseName @@ -259,7 +262,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return this._connectionPool.acquire(address) } - _freshRoutingTable ({ accessMode, database, bookmark, impersonatedUser, onDatabaseNameResolved } = {}) { + _freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) { const currentRoutingTable = this._routingTableRegistry.get( database, () => new RoutingTable({ database }) @@ -271,36 +274,37 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._log.info( `Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}` ) - return this._refreshRoutingTable(currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved) + return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser) + .then(newRoutingTable => { + onDatabaseNameResolved(newRoutingTable.database) + return newRoutingTable + }) } - _refreshRoutingTable (currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved) { + _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser) { const knownRouters = currentRoutingTable.routers if (this._useSeedRouter) { return this._fetchRoutingTableFromSeedRouterFallbackToKnownRouters( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser, - onDatabaseNameResolved + bookmarks, + impersonatedUser ) } return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser, - onDatabaseNameResolved + bookmarks, + impersonatedUser ) } async _fetchRoutingTableFromSeedRouterFallbackToKnownRouters ( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser, - onDatabaseNameResolved + bookmarks, + impersonatedUser ) { // we start with seed router, no routers were probed before const seenRouters = [] @@ -308,7 +312,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider seenRouters, this._seedRouter, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) @@ -319,29 +323,27 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider newRoutingTable = await this._fetchRoutingTableUsingKnownRouters( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) } return await this._applyRoutingTableIfPossible( currentRoutingTable, - newRoutingTable, - onDatabaseNameResolved + newRoutingTable ) } async _fetchRoutingTableFromKnownRoutersFallbackToSeedRouter ( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser, - onDatabaseNameResolved ) { let newRoutingTable = await this._fetchRoutingTableUsingKnownRouters( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) @@ -351,28 +353,27 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider knownRouters, this._seedRouter, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) } return await this._applyRoutingTableIfPossible( currentRoutingTable, - newRoutingTable, - onDatabaseNameResolved + newRoutingTable ) } async _fetchRoutingTableUsingKnownRouters ( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) { const newRoutingTable = await this._fetchRoutingTable( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) @@ -397,7 +398,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider seenRouters, seedRouter, routingTable, - bookmark, + bookmarks, impersonatedUser ) { const resolvedAddresses = await this._resolveSeedRouter(seedRouter) @@ -407,7 +408,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider address => seenRouters.indexOf(address) < 0 ) - return await this._fetchRoutingTable(newAddresses, routingTable, bookmark, impersonatedUser) + return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser) } async _resolveSeedRouter (seedRouter) { @@ -419,7 +420,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return [].concat.apply([], dnsResolvedAddresses) } - _fetchRoutingTable (routerAddresses, routingTable, bookmark, impersonatedUser) { + _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser) { return routerAddresses.reduce( async (refreshedTablePromise, currentRouter, currentIndex) => { const newRoutingTable = await refreshedTablePromise @@ -441,7 +442,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider // try next router const session = await this._createSessionForRediscovery( currentRouter, - bookmark, + bookmarks, impersonatedUser ) if (session) { @@ -474,7 +475,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ) } - async _createSessionForRediscovery (routerAddress, bookmark, impersonatedUser) { + async _createSessionForRediscovery (routerAddress, bookmarks, impersonatedUser) { try { const connection = await this._connectionPool.acquire(routerAddress) @@ -498,7 +499,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return new Session({ mode: READ, database: SYSTEM_DB_NAME, - bookmark, + bookmark: bookmarks, connectionProvider, impersonatedUser }) @@ -513,7 +514,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider } } - async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, onDatabaseNameResolved) { + async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable) { if (!newRoutingTable) { // none of routing servers returned valid routing table, throw exception throw newError( @@ -528,21 +529,18 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._useSeedRouter = true } - await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved) + await this._updateRoutingTable(newRoutingTable) return newRoutingTable } - async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved) { + async _updateRoutingTable (newRoutingTable) { // close old connections to servers not present in the new routing table await this._connectionPool.keepAll(newRoutingTable.allServers()) this._routingTableRegistry.removeExpired() this._routingTableRegistry.register( newRoutingTable ) - - onDatabaseNameResolved(newRoutingTable.database) - this._log.info(`Updated routing table ${newRoutingTable}`) } diff --git a/packages/bolt-connection/src/lang/functional.js b/packages/bolt-connection/src/lang/functional.js new file mode 100644 index 000000000..b5194299a --- /dev/null +++ b/packages/bolt-connection/src/lang/functional.js @@ -0,0 +1,58 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { json } from 'neo4j-driver-core' + +/** + * Identity function. + * + * Identity functions are function which returns the input as output. + * + * @param {any} x + * @returns {any} the x + */ +export function identity (x) { + return x +} + +/** + * Makes the function able to share ongoing requests + * + * @param {function(...args): Promise} func The function to be decorated + * @param {any} thisArg The `this` which should be used in the function call + * @return {function(...args): Promise} The decorated function + */ +export function reuseOngoingRequest (func, thisArg = null) { + const ongoingRequests = new Map() + + return function (...args) { + const key = json.stringify(args) + if (ongoingRequests.has(key)) { + return ongoingRequests.get(key) + } + + const promise = func.apply(thisArg, args) + + ongoingRequests.set(key, promise) + + return promise.finally(() => { + ongoingRequests.delete(key) + }) + } +} diff --git a/packages/bolt-connection/src/lang/index.js b/packages/bolt-connection/src/lang/index.js new file mode 100644 index 000000000..5fdce22c3 --- /dev/null +++ b/packages/bolt-connection/src/lang/index.js @@ -0,0 +1,19 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +export * as functional from './functional' diff --git a/packages/bolt-connection/test/lang/functional.test.js b/packages/bolt-connection/test/lang/functional.test.js new file mode 100644 index 000000000..a1fc76532 --- /dev/null +++ b/packages/bolt-connection/test/lang/functional.test.js @@ -0,0 +1,170 @@ +import { reuseOngoingRequest } from '../../src/lang/functional.js' + +describe('functional', () => { + describe('reuseOnGoingRequest', () => { + it('should call supplied function with the params', async () => { + const expectedParams = ['a', 1, { a: 'a' }] + const func = jest.fn(() => Promise.resolve()) + + const decoratedFunction = reuseOngoingRequest(func) + await decoratedFunction(...expectedParams) + + expect(func).toHaveBeenCalledWith(...expectedParams) + }) + + it('should call supplied function with this', async () => { + const expectedParams = ['a', 1, { a: 'a' }] + const thisArg = { t: 'his' } + const func = jest.fn(async function () { + return this + }) + + const decoratedFunction = reuseOngoingRequest(func, thisArg) + const receivedThis = await decoratedFunction(...expectedParams) + + expect(receivedThis).toBe(thisArg) + }) + + it('should return values returned by the supplied function', async () => { + const expectedResult = { a: 'abc' } + const func = jest.fn(() => Promise.resolve(expectedResult)) + + const decoratedFunction = reuseOngoingRequest(func) + const result = await decoratedFunction() + + expect(result).toBe(expectedResult) + }) + + it('should throw value thrown by supplied function', async () => { + const error = new Error('Oops, I did it again!') + const func = jest.fn(() => Promise.reject(error)) + + const decoratedFunction = reuseOngoingRequest(func) + const promise = decoratedFunction() + expect(promise).rejects.toThrow(error) + }) + + it('should share ongoing request with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult = { a: 'abc' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams), + decoratedFunction(...expectedParams), + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].resolve(expectedResult) // closing ongoing request + + const results = await Promise.all(resultPromises) + + expect(results).toEqual([expectedResult, expectedResult, expectedResult]) + }) + + it('should not share ongoing request with different params', async () => { + const expectedParams1 = ['a', 1, [3]] + const expectedResult1 = { a: 'abc' } + const expectedParams2 = [4, 'a', []] + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams1), + decoratedFunction(...expectedParams2) + ] + + expect(func).toBeCalledTimes(2) + expect(func).toBeCalledWith(...expectedParams1) + expect(func).toBeCalledWith(...expectedParams2) + + expect(promises.length).toBe(2) + + promises[0].resolve(expectedResult1) // closing ongoing request 1 + promises[1].resolve(expectedResult2) // closing ongoing request 2 + + const results = await Promise.all(resultPromises) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + it('should not share resolved requests with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult1 = { a: 'abc' } + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].resolve(expectedResult1) // closing ongoing request + + const results = await Promise.all(resultPromises) + + resultPromises.push(decoratedFunction(...expectedParams)) + + expect(func).toBeCalledTimes(2) + expect(promises.length).toBe(2) + + promises[1].resolve(expectedResult2) // closing ongoing request + + results.push(await resultPromises[1]) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + it('should not share rejected requests with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult1 = new Error('Ops, I did it again!') + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].reject(expectedResult1) // closing ongoing request + + const results = await Promise.all( + resultPromises.map(promise => promise.catch(error => error)) + ) + + resultPromises.push(decoratedFunction(...expectedParams)) + + expect(func).toBeCalledTimes(2) + expect(promises.length).toBe(2) + + promises[1].resolve(expectedResult2) // closing ongoing request + + results.push(await resultPromises[1]) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + function mockPromiseFunction () { + const promises = [] + const func = jest.fn(() => new Promise((resolve, reject) => { + promises.push({ resolve, reject }) + })) + return { promises, func } + } + }) +})