diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index 0dbe88b26..df81ee870 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -28,6 +28,7 @@ import { ConnectionErrorHandler, DelegateConnection } from '../connection' +import { functional } from '../lang' const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error const { @@ -85,6 +86,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ? int(routingTablePurgeDelay) : DEFAULT_ROUTING_TABLE_PURGE_DELAY ) + + this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this) } _createConnectionErrorHandler () { @@ -140,7 +143,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider const routingTable = await this._freshRoutingTable({ accessMode, database: context.database, - bookmark: bookmarks, + bookmarks, impersonatedUser, onDatabaseNameResolved: (databaseName) => { context.database = context.database || databaseName @@ -259,7 +262,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return this._connectionPool.acquire(address) } - _freshRoutingTable ({ accessMode, database, bookmark, impersonatedUser, onDatabaseNameResolved } = {}) { + _freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) { const currentRoutingTable = this._routingTableRegistry.get( database, () => new RoutingTable({ database }) @@ -271,36 +274,37 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._log.info( `Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}` ) - return this._refreshRoutingTable(currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved) + return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser) + .then(newRoutingTable => { + onDatabaseNameResolved(newRoutingTable.database) + return newRoutingTable + }) } - _refreshRoutingTable (currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved) { + _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser) { const knownRouters = currentRoutingTable.routers if (this._useSeedRouter) { return this._fetchRoutingTableFromSeedRouterFallbackToKnownRouters( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser, - onDatabaseNameResolved + bookmarks, + impersonatedUser ) } return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser, - onDatabaseNameResolved + bookmarks, + impersonatedUser ) } async _fetchRoutingTableFromSeedRouterFallbackToKnownRouters ( knownRouters, currentRoutingTable, - bookmark, - impersonatedUser, - onDatabaseNameResolved + bookmarks, + impersonatedUser ) { // we start with seed router, no routers were probed before const seenRouters = [] @@ -308,7 +312,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider seenRouters, this._seedRouter, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) @@ -319,29 +323,27 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider newRoutingTable = await this._fetchRoutingTableUsingKnownRouters( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) } return await this._applyRoutingTableIfPossible( currentRoutingTable, - newRoutingTable, - onDatabaseNameResolved + newRoutingTable ) } async _fetchRoutingTableFromKnownRoutersFallbackToSeedRouter ( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser, - onDatabaseNameResolved ) { let newRoutingTable = await this._fetchRoutingTableUsingKnownRouters( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) @@ -351,28 +353,27 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider knownRouters, this._seedRouter, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) } return await this._applyRoutingTableIfPossible( currentRoutingTable, - newRoutingTable, - onDatabaseNameResolved + newRoutingTable ) } async _fetchRoutingTableUsingKnownRouters ( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) { const newRoutingTable = await this._fetchRoutingTable( knownRouters, currentRoutingTable, - bookmark, + bookmarks, impersonatedUser ) @@ -397,7 +398,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider seenRouters, seedRouter, routingTable, - bookmark, + bookmarks, impersonatedUser ) { const resolvedAddresses = await this._resolveSeedRouter(seedRouter) @@ -407,7 +408,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider address => seenRouters.indexOf(address) < 0 ) - return await this._fetchRoutingTable(newAddresses, routingTable, bookmark, impersonatedUser) + return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser) } async _resolveSeedRouter (seedRouter) { @@ -419,7 +420,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return [].concat.apply([], dnsResolvedAddresses) } - _fetchRoutingTable (routerAddresses, routingTable, bookmark, impersonatedUser) { + _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser) { return routerAddresses.reduce( async (refreshedTablePromise, currentRouter, currentIndex) => { const newRoutingTable = await refreshedTablePromise @@ -441,7 +442,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider // try next router const session = await this._createSessionForRediscovery( currentRouter, - bookmark, + bookmarks, impersonatedUser ) if (session) { @@ -474,7 +475,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ) } - async _createSessionForRediscovery (routerAddress, bookmark, impersonatedUser) { + async _createSessionForRediscovery (routerAddress, bookmarks, impersonatedUser) { try { const connection = await this._connectionPool.acquire(routerAddress) @@ -498,7 +499,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return new Session({ mode: READ, database: SYSTEM_DB_NAME, - bookmark, + bookmark: bookmarks, connectionProvider, impersonatedUser }) @@ -513,7 +514,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider } } - async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, onDatabaseNameResolved) { + async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable) { if (!newRoutingTable) { // none of routing servers returned valid routing table, throw exception throw newError( @@ -528,21 +529,18 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._useSeedRouter = true } - await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved) + await this._updateRoutingTable(newRoutingTable) return newRoutingTable } - async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved) { + async _updateRoutingTable (newRoutingTable) { // close old connections to servers not present in the new routing table await this._connectionPool.keepAll(newRoutingTable.allServers()) this._routingTableRegistry.removeExpired() this._routingTableRegistry.register( newRoutingTable ) - - onDatabaseNameResolved(newRoutingTable.database) - this._log.info(`Updated routing table ${newRoutingTable}`) } diff --git a/packages/bolt-connection/src/lang/functional.js b/packages/bolt-connection/src/lang/functional.js new file mode 100644 index 000000000..b5194299a --- /dev/null +++ b/packages/bolt-connection/src/lang/functional.js @@ -0,0 +1,58 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { json } from 'neo4j-driver-core' + +/** + * Identity function. + * + * Identity functions are function which returns the input as output. + * + * @param {any} x + * @returns {any} the x + */ +export function identity (x) { + return x +} + +/** + * Makes the function able to share ongoing requests + * + * @param {function(...args): Promise} func The function to be decorated + * @param {any} thisArg The `this` which should be used in the function call + * @return {function(...args): Promise} The decorated function + */ +export function reuseOngoingRequest (func, thisArg = null) { + const ongoingRequests = new Map() + + return function (...args) { + const key = json.stringify(args) + if (ongoingRequests.has(key)) { + return ongoingRequests.get(key) + } + + const promise = func.apply(thisArg, args) + + ongoingRequests.set(key, promise) + + return promise.finally(() => { + ongoingRequests.delete(key) + }) + } +} diff --git a/packages/bolt-connection/src/lang/index.js b/packages/bolt-connection/src/lang/index.js new file mode 100644 index 000000000..5fdce22c3 --- /dev/null +++ b/packages/bolt-connection/src/lang/index.js @@ -0,0 +1,19 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +export * as functional from './functional' diff --git a/packages/bolt-connection/test/lang/functional.test.js b/packages/bolt-connection/test/lang/functional.test.js new file mode 100644 index 000000000..a1fc76532 --- /dev/null +++ b/packages/bolt-connection/test/lang/functional.test.js @@ -0,0 +1,170 @@ +import { reuseOngoingRequest } from '../../src/lang/functional.js' + +describe('functional', () => { + describe('reuseOnGoingRequest', () => { + it('should call supplied function with the params', async () => { + const expectedParams = ['a', 1, { a: 'a' }] + const func = jest.fn(() => Promise.resolve()) + + const decoratedFunction = reuseOngoingRequest(func) + await decoratedFunction(...expectedParams) + + expect(func).toHaveBeenCalledWith(...expectedParams) + }) + + it('should call supplied function with this', async () => { + const expectedParams = ['a', 1, { a: 'a' }] + const thisArg = { t: 'his' } + const func = jest.fn(async function () { + return this + }) + + const decoratedFunction = reuseOngoingRequest(func, thisArg) + const receivedThis = await decoratedFunction(...expectedParams) + + expect(receivedThis).toBe(thisArg) + }) + + it('should return values returned by the supplied function', async () => { + const expectedResult = { a: 'abc' } + const func = jest.fn(() => Promise.resolve(expectedResult)) + + const decoratedFunction = reuseOngoingRequest(func) + const result = await decoratedFunction() + + expect(result).toBe(expectedResult) + }) + + it('should throw value thrown by supplied function', async () => { + const error = new Error('Oops, I did it again!') + const func = jest.fn(() => Promise.reject(error)) + + const decoratedFunction = reuseOngoingRequest(func) + const promise = decoratedFunction() + expect(promise).rejects.toThrow(error) + }) + + it('should share ongoing request with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult = { a: 'abc' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams), + decoratedFunction(...expectedParams), + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].resolve(expectedResult) // closing ongoing request + + const results = await Promise.all(resultPromises) + + expect(results).toEqual([expectedResult, expectedResult, expectedResult]) + }) + + it('should not share ongoing request with different params', async () => { + const expectedParams1 = ['a', 1, [3]] + const expectedResult1 = { a: 'abc' } + const expectedParams2 = [4, 'a', []] + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams1), + decoratedFunction(...expectedParams2) + ] + + expect(func).toBeCalledTimes(2) + expect(func).toBeCalledWith(...expectedParams1) + expect(func).toBeCalledWith(...expectedParams2) + + expect(promises.length).toBe(2) + + promises[0].resolve(expectedResult1) // closing ongoing request 1 + promises[1].resolve(expectedResult2) // closing ongoing request 2 + + const results = await Promise.all(resultPromises) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + it('should not share resolved requests with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult1 = { a: 'abc' } + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].resolve(expectedResult1) // closing ongoing request + + const results = await Promise.all(resultPromises) + + resultPromises.push(decoratedFunction(...expectedParams)) + + expect(func).toBeCalledTimes(2) + expect(promises.length).toBe(2) + + promises[1].resolve(expectedResult2) // closing ongoing request + + results.push(await resultPromises[1]) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + it('should not share rejected requests with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult1 = new Error('Ops, I did it again!') + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].reject(expectedResult1) // closing ongoing request + + const results = await Promise.all( + resultPromises.map(promise => promise.catch(error => error)) + ) + + resultPromises.push(decoratedFunction(...expectedParams)) + + expect(func).toBeCalledTimes(2) + expect(promises.length).toBe(2) + + promises[1].resolve(expectedResult2) // closing ongoing request + + results.push(await resultPromises[1]) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + function mockPromiseFunction () { + const promises = [] + const func = jest.fn(() => new Promise((resolve, reject) => { + promises.push({ resolve, reject }) + })) + return { promises, func } + } + }) +})