diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index b973269fc..ed653ebaa 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -28,6 +28,7 @@ import { ConnectionErrorHandler, DelegateConnection } from '../connection' +import { functional } from '../lang' const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error const { @@ -97,6 +98,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ? int(routingTablePurgeDelay) : DEFAULT_ROUTING_TABLE_PURGE_DELAY ) + + this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this) } _createConnectionErrorHandler () { @@ -357,10 +360,14 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._log.info( `Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}` ) - return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, auth) + return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, auth) + .then(newRoutingTable => { + onDatabaseNameResolved(newRoutingTable.database) + return newRoutingTable + }) } - _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, auth) { + _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, auth) { const knownRouters = currentRoutingTable.routers if (this._useSeedRouter) { @@ -369,7 +376,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, auth ) } @@ -378,7 +384,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, auth ) } @@ -388,7 +393,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, auth ) { // we start with seed router, no routers were probed before @@ -420,7 +424,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return await this._applyRoutingTableIfPossible( currentRoutingTable, newRoutingTable, - onDatabaseNameResolved, error ) } @@ -430,7 +433,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, auth ) { let [newRoutingTable, error] = await this._fetchRoutingTableUsingKnownRouters( @@ -456,7 +458,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return await this._applyRoutingTableIfPossible( currentRoutingTable, newRoutingTable, - onDatabaseNameResolved, error ) } @@ -630,7 +631,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return [null, error] } - async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, onDatabaseNameResolved, error) { + async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, error) { if (!newRoutingTable) { // none of routing servers returned valid routing table, throw exception throw newError( @@ -646,12 +647,12 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._useSeedRouter = true } - await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved) + await this._updateRoutingTable(newRoutingTable) return newRoutingTable } - async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved) { + async _updateRoutingTable (newRoutingTable) { // close old connections to servers not present in the new routing table await this._connectionPool.keepAll(newRoutingTable.allServers()) this._routingTableRegistry.removeExpired() @@ -659,8 +660,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider newRoutingTable ) - onDatabaseNameResolved(newRoutingTable.database) - this._log.info(`Updated routing table ${newRoutingTable}`) } diff --git a/packages/bolt-connection/src/lang/functional.js b/packages/bolt-connection/src/lang/functional.js index 24aa87184..b5194299a 100644 --- a/packages/bolt-connection/src/lang/functional.js +++ b/packages/bolt-connection/src/lang/functional.js @@ -17,6 +17,8 @@ * limitations under the License. */ +import { json } from 'neo4j-driver-core' + /** * Identity function. * @@ -28,3 +30,29 @@ export function identity (x) { return x } + +/** + * Makes the function able to share ongoing requests + * + * @param {function(...args): Promise} func The function to be decorated + * @param {any} thisArg The `this` which should be used in the function call + * @return {function(...args): Promise} The decorated function + */ +export function reuseOngoingRequest (func, thisArg = null) { + const ongoingRequests = new Map() + + return function (...args) { + const key = json.stringify(args) + if (ongoingRequests.has(key)) { + return ongoingRequests.get(key) + } + + const promise = func.apply(thisArg, args) + + ongoingRequests.set(key, promise) + + return promise.finally(() => { + ongoingRequests.delete(key) + }) + } +} diff --git a/packages/bolt-connection/test/lang/functional.test.js b/packages/bolt-connection/test/lang/functional.test.js new file mode 100644 index 000000000..a1fc76532 --- /dev/null +++ b/packages/bolt-connection/test/lang/functional.test.js @@ -0,0 +1,170 @@ +import { reuseOngoingRequest } from '../../src/lang/functional.js' + +describe('functional', () => { + describe('reuseOnGoingRequest', () => { + it('should call supplied function with the params', async () => { + const expectedParams = ['a', 1, { a: 'a' }] + const func = jest.fn(() => Promise.resolve()) + + const decoratedFunction = reuseOngoingRequest(func) + await decoratedFunction(...expectedParams) + + expect(func).toHaveBeenCalledWith(...expectedParams) + }) + + it('should call supplied function with this', async () => { + const expectedParams = ['a', 1, { a: 'a' }] + const thisArg = { t: 'his' } + const func = jest.fn(async function () { + return this + }) + + const decoratedFunction = reuseOngoingRequest(func, thisArg) + const receivedThis = await decoratedFunction(...expectedParams) + + expect(receivedThis).toBe(thisArg) + }) + + it('should return values returned by the supplied function', async () => { + const expectedResult = { a: 'abc' } + const func = jest.fn(() => Promise.resolve(expectedResult)) + + const decoratedFunction = reuseOngoingRequest(func) + const result = await decoratedFunction() + + expect(result).toBe(expectedResult) + }) + + it('should throw value thrown by supplied function', async () => { + const error = new Error('Oops, I did it again!') + const func = jest.fn(() => Promise.reject(error)) + + const decoratedFunction = reuseOngoingRequest(func) + const promise = decoratedFunction() + expect(promise).rejects.toThrow(error) + }) + + it('should share ongoing request with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult = { a: 'abc' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams), + decoratedFunction(...expectedParams), + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].resolve(expectedResult) // closing ongoing request + + const results = await Promise.all(resultPromises) + + expect(results).toEqual([expectedResult, expectedResult, expectedResult]) + }) + + it('should not share ongoing request with different params', async () => { + const expectedParams1 = ['a', 1, [3]] + const expectedResult1 = { a: 'abc' } + const expectedParams2 = [4, 'a', []] + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams1), + decoratedFunction(...expectedParams2) + ] + + expect(func).toBeCalledTimes(2) + expect(func).toBeCalledWith(...expectedParams1) + expect(func).toBeCalledWith(...expectedParams2) + + expect(promises.length).toBe(2) + + promises[0].resolve(expectedResult1) // closing ongoing request 1 + promises[1].resolve(expectedResult2) // closing ongoing request 2 + + const results = await Promise.all(resultPromises) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + it('should not share resolved requests with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult1 = { a: 'abc' } + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].resolve(expectedResult1) // closing ongoing request + + const results = await Promise.all(resultPromises) + + resultPromises.push(decoratedFunction(...expectedParams)) + + expect(func).toBeCalledTimes(2) + expect(promises.length).toBe(2) + + promises[1].resolve(expectedResult2) // closing ongoing request + + results.push(await resultPromises[1]) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + it('should not share rejected requests with same params', async () => { + const expectedParams = ['a', 1, [3]] + const expectedResult1 = new Error('Ops, I did it again!') + const expectedResult2 = { k: 'bbk' } + const { promises, func } = mockPromiseFunction() + + const decoratedFunction = reuseOngoingRequest(func) + + const resultPromises = [ + decoratedFunction(...expectedParams) + ] + + expect(func).toBeCalledTimes(1) + expect(promises.length).toBe(1) + + promises[0].reject(expectedResult1) // closing ongoing request + + const results = await Promise.all( + resultPromises.map(promise => promise.catch(error => error)) + ) + + resultPromises.push(decoratedFunction(...expectedParams)) + + expect(func).toBeCalledTimes(2) + expect(promises.length).toBe(2) + + promises[1].resolve(expectedResult2) // closing ongoing request + + results.push(await resultPromises[1]) + + expect(results).toEqual([expectedResult1, expectedResult2]) + }) + + function mockPromiseFunction () { + const promises = [] + const func = jest.fn(() => new Promise((resolve, reject) => { + promises.push({ resolve, reject }) + })) + return { promises, func } + } + }) +}) diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-routing.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-routing.js index 804d64b49..8bd671dc5 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-routing.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-routing.js @@ -28,6 +28,7 @@ import { ConnectionErrorHandler, DelegateConnection } from '../connection/index.js' +import { functional } from '../lang/index.js' const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error const { @@ -97,6 +98,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider ? int(routingTablePurgeDelay) : DEFAULT_ROUTING_TABLE_PURGE_DELAY ) + + this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this) } _createConnectionErrorHandler () { @@ -357,10 +360,14 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._log.info( `Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}` ) - return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, auth) + return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, auth) + .then(newRoutingTable => { + onDatabaseNameResolved(newRoutingTable.database) + return newRoutingTable + }) } - _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, auth) { + _refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, auth) { const knownRouters = currentRoutingTable.routers if (this._useSeedRouter) { @@ -369,7 +376,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, auth ) } @@ -378,7 +384,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, auth ) } @@ -388,7 +393,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, auth ) { // we start with seed router, no routers were probed before @@ -420,7 +424,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return await this._applyRoutingTableIfPossible( currentRoutingTable, newRoutingTable, - onDatabaseNameResolved, error ) } @@ -430,7 +433,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider currentRoutingTable, bookmarks, impersonatedUser, - onDatabaseNameResolved, auth ) { let [newRoutingTable, error] = await this._fetchRoutingTableUsingKnownRouters( @@ -456,7 +458,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return await this._applyRoutingTableIfPossible( currentRoutingTable, newRoutingTable, - onDatabaseNameResolved, error ) } @@ -630,7 +631,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return [null, error] } - async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, onDatabaseNameResolved, error) { + async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, error) { if (!newRoutingTable) { // none of routing servers returned valid routing table, throw exception throw newError( @@ -646,12 +647,12 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._useSeedRouter = true } - await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved) + await this._updateRoutingTable(newRoutingTable) return newRoutingTable } - async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved) { + async _updateRoutingTable (newRoutingTable) { // close old connections to servers not present in the new routing table await this._connectionPool.keepAll(newRoutingTable.allServers()) this._routingTableRegistry.removeExpired() @@ -659,8 +660,6 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider newRoutingTable ) - onDatabaseNameResolved(newRoutingTable.database) - this._log.info(`Updated routing table ${newRoutingTable}`) } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/lang/functional.js b/packages/neo4j-driver-deno/lib/bolt-connection/lang/functional.js index 24aa87184..ccccbf621 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/lang/functional.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/lang/functional.js @@ -17,6 +17,8 @@ * limitations under the License. */ +import { json } from '../../core/index.ts' + /** * Identity function. * @@ -28,3 +30,29 @@ export function identity (x) { return x } + +/** + * Makes the function able to share ongoing requests + * + * @param {function(...args): Promise} func The function to be decorated + * @param {any} thisArg The `this` which should be used in the function call + * @return {function(...args): Promise} The decorated function + */ +export function reuseOngoingRequest (func, thisArg = null) { + const ongoingRequests = new Map() + + return function (...args) { + const key = json.stringify(args) + if (ongoingRequests.has(key)) { + return ongoingRequests.get(key) + } + + const promise = func.apply(thisArg, args) + + ongoingRequests.set(key, promise) + + return promise.finally(() => { + ongoingRequests.delete(key) + }) + } +}