From 63ee25d1d3517c6203cd8ec5a6dd2fc215275cb9 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 16 Feb 2022 14:49:08 +0100 Subject: [PATCH 1/4] Introduce `Driver.getServerInfo` method This is part of the drivers unification initiative. The `Driver.verifyConnectivity` method was implemented in different ways accross the drivers with different returns value. For concistency and clarity about what the method does, the new behaviour of the `Driver.verifyConnectivy` is check the connectivity with all the readers available and then returns a resolve empty promise in case of success (the promise will be rejected, otherwise). The Server Info information was moved to the method `Driver.getSeverInfo` which verifies the connectivity and then returnt the `ServerInfo` of one of the read servers. For keeping the backwards compatibility, the `Driver.verifyConnectivity` was depreacted in this version and it should be replace with a version with `Promise` as return value in the next major release. `Driver.getServerInfo` should be used instead if the client code needs server information. --- .../connection-provider-direct.js | 4 ++++ .../connection-provider-pooled.js | 16 +++++++++++++++- .../connection-provider-routing.js | 19 ++++++++++++++++++- packages/bolt-connection/src/pool/pool.js | 2 +- packages/core/src/connection-provider.ts | 15 +++++++++++++++ packages/core/src/driver.ts | 18 ++++++++++++++++-- .../neo4j-driver-lite/test/unit/index.test.ts | 3 ++- .../testkit-backend/src/request-handlers.js | 13 +++++++++++++ 8 files changed, 84 insertions(+), 6 deletions(-) diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-direct.js b/packages/bolt-connection/src/connection-provider/connection-provider-direct.js index 1fa74947a..96a4c7295 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-direct.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-direct.js @@ -103,4 +103,8 @@ export default class DirectConnectionProvider extends PooledConnectionProvider { version => version >= BOLT_PROTOCOL_V4_4 ) } + + async verifyConnectivityAndGetServerInfo () { + return await this._verifyConnectivityAndGetServerVersion({ address: this._address }) + } } diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index 8870566b0..e40e5861d 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -19,7 +19,7 @@ import { createChannelConnection, ConnectionErrorHandler } from '../connection' import Pool, { PoolConfig } from '../pool' -import { error, ConnectionProvider } from 'neo4j-driver-core' +import { error, ConnectionProvider, ServerInfo } from 'neo4j-driver-core' const { SERVICE_UNAVAILABLE } = error export default class PooledConnectionProvider extends ConnectionProvider { @@ -109,6 +109,20 @@ export default class PooledConnectionProvider extends ConnectionProvider { return conn.close() } + /** + * Acquire a connection from the pool and return it ServerInfo + * @param {object} param + * @param {string} param.address the server address + * @return {Promise} the server info + */ + async _verifyConnectivityAndGetServerVersion ({ address }) { + const connection = await this._connectionPool.acquire(address) + const serverInfo = new ServerInfo(connection.server, connection.protocol().version) + await connection.resetAndFlush().catch(() => {}) + await connection._release() + return serverInfo + } + async close () { // purge all idle connections in the connection pool await this._connectionPool.close() diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js index 97e25f79a..75b7ee77a 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-routing.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-routing.js @@ -237,7 +237,24 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider return await this._hasProtocolVersion( version => version >= BOLT_PROTOCOL_V4_4 ) - } + } + + async verifyConnectivityAndGetServerInfo ({ database, accessMode }) { + const context = { database: database || DEFAULT_DB_NAME } + + const routingTable = await this._freshRoutingTable({ + accessMode, + database: context.database, + onDatabaseNameResolved: (databaseName) => { + context.database = context.database || databaseName + } + }) + + const servers = accessMode === WRITE ? routingTable.writers : routingTable.readers + + return Promise.all(servers.map(address => this._verifyConnectivityAndGetServerVersion({ address }))) + .then(([serverInfo]) => serverInfo) + } forget (address, database) { this._routingTableRegistry.apply(database, { diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index 14b38400a..b5d01bdb7 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -71,7 +71,7 @@ class Pool { /** * Acquire and idle resource fom the pool or create a new one. * @param {ServerAddress} address the address for which we're acquiring. - * @return {Object} resource that is ready to use. + * @return {Promise} resource that is ready to use. */ acquire (address) { const key = address.asKey() diff --git a/packages/core/src/connection-provider.ts b/packages/core/src/connection-provider.ts index 27c8c4786..5398018da 100644 --- a/packages/core/src/connection-provider.ts +++ b/packages/core/src/connection-provider.ts @@ -19,6 +19,7 @@ import Connection from './connection' import { bookmarks } from './internal' +import { ServerInfo } from './result-summary' /** @@ -83,6 +84,20 @@ class ConnectionProvider { throw Error('Not implemented') } + /** + * This method verifies the connectivity of the database by trying to acquire a connection + * for each server available in the cluster. + * + * @param {object} param - object parameter + * @property {string} param.database - the target database for the to-be-acquired connection + * @property {string} param.accessMode - the access mode for the to-be-acquired connection + * + * @returns {Promise} promise resolved with server info or rejected with error. + */ + verifyConnectivityAndGetServerInfo(param? :{ database?: string, accessMode?: string }): Promise { + throw Error('Not implemented') + } + /** * Closes this connection provider along with its internals (connections, pools, etc.) * diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index fc275630d..feb077ec3 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -157,6 +157,9 @@ class Driver { /** * Verifies connectivity of this driver by trying to open a connection with the provided driver options. * + * @deprecated This return of this method will change in 6.0.0 to not return the {@link Promise} and return a + * {@link Promise} instead. If you need to use the server info, use {@link getServerInfo} instead. + * * @public * @param {Object} param - The object parameter * @param {string} param.database - The target database to verify connectivity for. @@ -166,8 +169,19 @@ class Driver { ServerInfo > { const connectionProvider = this._getOrCreateConnectionProvider() - const connectivityVerifier = new ConnectivityVerifier(connectionProvider) - return connectivityVerifier.verify({ database }) + return connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode: READ }) + } + + /** + * Get ServerInfo for the giver database. + * + * @param {Object} param - The object parameter + * @param {string} param.database - The target database to verify connectivity for. + * @returns {Promise} promise resolved with void or rejected with error. + */ + getServerInfo({ database = ''}: { database?: string } = {}): Promise { + const connectionProvider = this._getOrCreateConnectionProvider() + return connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode: READ }) } /** diff --git a/packages/neo4j-driver-lite/test/unit/index.test.ts b/packages/neo4j-driver-lite/test/unit/index.test.ts index 2d6eb2b67..8f25ed7fe 100644 --- a/packages/neo4j-driver-lite/test/unit/index.test.ts +++ b/packages/neo4j-driver-lite/test/unit/index.test.ts @@ -221,7 +221,8 @@ describe('index', () => { close: () => Promise.resolve(), supportsMultiDb: () => Promise.resolve(true), supportsTransactionConfig: () => Promise.resolve(true), - supportsUserImpersonation: () => Promise.resolve(true) + supportsUserImpersonation: () => Promise.resolve(true), + verifyConnectivityAndGetServerInfo: () => Promise.resolve(new ServerInfo({})) } }) expect(session).toBeDefined() diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index a09efd98b..2611312d7 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -381,6 +381,8 @@ export function GetFeatures (_context, _params, wire) { 'Feature:Bolt:4.2', 'Feature:Bolt:4.3', 'Feature:Bolt:4.4', + 'Feature:API:Driver:GetServerInfo', + 'Feature:API:Driver.VerifyConnectivity', 'Feature:API:Result.List', 'Feature:API:Result.Peek', 'Feature:Configuration:ConnectionAcquisitionTimeout', @@ -410,6 +412,17 @@ export function VerifyConnectivity (context, { driverId }, wire) { .catch(error => wire.writeError(error)) } +export function GetServerInfo (context, { driverId }, wire) { + const driver = context.getDriver(driverId) + return driver + .getServerInfo() + .then(info => wire.writeResponse('ServerInfo', { + ...info, + protocolVersion: info.protocolVersion.toFixed(1) + })) + .catch(error => wire.writeError(error)) +} + export function CheckMultiDBSupport (context, { driverId }, wire) { const driver = context.getDriver(driverId) return driver From 9302858be483980cee376af4f75721e06ba23f2a Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 16 Feb 2022 16:02:50 +0100 Subject: [PATCH 2/4] Remove connection-verifier and tests for the driver.ts methods --- packages/core/src/driver.ts | 5 +- .../src/internal/connectivity-verifier.ts | 86 ------------------- packages/core/src/internal/index.ts | 2 - packages/core/test/driver.test.ts | 46 +++++++++- .../internal/connectivity-verifier.test.js | 38 -------- 5 files changed, 46 insertions(+), 131 deletions(-) delete mode 100644 packages/core/src/internal/connectivity-verifier.ts delete mode 100644 packages/neo4j-driver/test/internal/connectivity-verifier.test.js diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index feb077ec3..257181d57 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -19,7 +19,6 @@ import ConnectionProvider from './connection-provider' import { Bookmarks } from './internal/bookmarks' -import { ConnectivityVerifier } from './internal/connectivity-verifier' import ConfiguredCustomResolver from './internal/resolver/configured-custom-resolver' import { @@ -157,8 +156,8 @@ class Driver { /** * Verifies connectivity of this driver by trying to open a connection with the provided driver options. * - * @deprecated This return of this method will change in 6.0.0 to not return the {@link Promise} and return a - * {@link Promise} instead. If you need to use the server info, use {@link getServerInfo} instead. + * @deprecated This return of this method will change in 6.0.0 to not async return the {@link ServerInfo} and + * async return {@link void} instead. If you need to use the server info, use {@link getServerInfo} instead. * * @public * @param {Object} param - The object parameter diff --git a/packages/core/src/internal/connectivity-verifier.ts b/packages/core/src/internal/connectivity-verifier.ts deleted file mode 100644 index 565cd42d7..000000000 --- a/packages/core/src/internal/connectivity-verifier.ts +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { ConnectionHolder } from './connection-holder' -import ConnectionProvider from '../connection-provider' -import { ACCESS_MODE_READ } from './constants' -import { newError } from '../error' -import { ServerInfo } from '../result-summary' - -/** - * Verifies connectivity using the given connection provider. - */ -export class ConnectivityVerifier { - private readonly _connectionProvider: ConnectionProvider - - /** - * @constructor - * @param {ConnectionProvider} connectionProvider the provider to obtain connections from. - */ - constructor(connectionProvider: ConnectionProvider) { - this._connectionProvider = connectionProvider - } - - /** - * Try to obtain a working connection from the connection provider. - * @returns {Promise} promise resolved with server info or rejected with error. - */ - verify({ database = '' } = {}): Promise { - return acquireAndReleaseDummyConnection(this._connectionProvider, database) - } -} - -/** - * @private - * @param {ConnectionProvider} connectionProvider the provider to obtain connections from. - * @param {string|undefined} database The database name - * @return {Promise} promise resolved with server info or rejected with error. - */ -function acquireAndReleaseDummyConnection( - connectionProvider: ConnectionProvider, - database?: string -): Promise { - const connectionHolder = new ConnectionHolder({ - mode: ACCESS_MODE_READ, - database, - connectionProvider - }) - connectionHolder.initializeConnection() - - return connectionHolder - .getConnection() - .then(connection => { - // able to establish a connection - if (!connection) { - throw newError('Unexpected error acquiring transaction') - } - return connectionHolder.close().then(() => connection.server) - }) - .catch(error => { - // failed to establish a connection - return connectionHolder - .close() - .catch(ignoredError => { - // ignore connection release error - }) - .then(() => { - return Promise.reject(error) - }) - }) -} diff --git a/packages/core/src/internal/index.ts b/packages/core/src/internal/index.ts index 8959e027e..fc0762681 100644 --- a/packages/core/src/internal/index.ts +++ b/packages/core/src/internal/index.ts @@ -25,7 +25,6 @@ import * as constants from './constants' import * as connectionHolder from './connection-holder' import * as txConfig from './tx-config' import * as transactionExecutor from './transaction-executor' -import * as connectivityVerifier from './connectivity-verifier' import * as logger from './logger' import * as urlUtil from './url-util' import * as serverAddress from './server-address' @@ -41,7 +40,6 @@ export { connectionHolder, txConfig, transactionExecutor, - connectivityVerifier, logger, urlUtil, serverAddress, diff --git a/packages/core/test/driver.test.ts b/packages/core/test/driver.test.ts index 9b379f568..ef956eff4 100644 --- a/packages/core/test/driver.test.ts +++ b/packages/core/test/driver.test.ts @@ -16,8 +16,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { ConnectionProvider, newError, Session } from '../src' -import Driver from '../src/driver' +import { ConnectionProvider, newError, ServerInfo, Session } from '../src' +import Driver, { READ } from '../src/driver' import { Bookmarks } from '../src/internal/bookmarks' import { Logger } from '../src/internal/logger' import { ConfiguredCustomResolver } from '../src/internal/resolver' @@ -194,6 +194,48 @@ describe('Driver', () => { await driver.close() }) + it.each([ + [undefined, 'Promise.resolve(ServerInfo>)', Promise.resolve(new ServerInfo())], + [undefined, 'Promise.reject(Error)', Promise.reject(newError('something went wrong'))], + [{}, 'Promise.resolve(ServerInfo>)', Promise.resolve(new ServerInfo())], + [{}, 'Promise.reject(Error)', Promise.reject(newError('something went wrong'))], + [{ database: undefined }, 'Promise.resolve(ServerInfo>)', Promise.resolve(new ServerInfo())], + [{ database: undefined }, 'Promise.reject(Error)', Promise.reject(newError('something went wrong'))], + [{ database: 'db' }, 'Promise.resolve(ServerInfo>)', Promise.resolve(new ServerInfo())], + [{ database: 'db' }, 'Promise.reject(Error)', Promise.reject(newError('something went wrong'))], + ])('.verifyConnectivity(%o) => %s', (input: { database?: string } | undefined, _, expectedPromise) => { + connectionProvider.verifyConnectivityAndGetServerInfo = jest.fn(() => expectedPromise) + + const promise: Promise = driver!.verifyConnectivity(input) + + expect(promise).toBe(expectedPromise) + expect(connectionProvider.verifyConnectivityAndGetServerInfo) + .toBeCalledWith({ database: input && input.database ? input.database : '', accessMode: READ }) + + promise.catch(_ => 'Do nothing').finally(() => { }) + }) + + it.each([ + [undefined, 'Promise.resolve(ServerInfo>)', Promise.resolve(new ServerInfo())], + [undefined, 'Promise.reject(Error)', Promise.reject(newError('something went wrong'))], + [{}, 'Promise.resolve(ServerInfo>)', Promise.resolve(new ServerInfo())], + [{}, 'Promise.reject(Error)', Promise.reject(newError('something went wrong'))], + [{ database: undefined }, 'Promise.resolve(ServerInfo>)', Promise.resolve(new ServerInfo())], + [{ database: undefined }, 'Promise.reject(Error)', Promise.reject(newError('something went wrong'))], + [{ database: 'db' }, 'Promise.resolve(ServerInfo>)', Promise.resolve(new ServerInfo())], + [{ database: 'db' }, 'Promise.reject(Error)', Promise.reject(newError('something went wrong'))], + ])('.getServerInfo(%o) => %s', (input: { database?: string } | undefined, _, expectedPromise) => { + connectionProvider.verifyConnectivityAndGetServerInfo = jest.fn(() => expectedPromise) + + const promise: Promise = driver!.getServerInfo(input) + + expect(promise).toBe(expectedPromise) + expect(connectionProvider.verifyConnectivityAndGetServerInfo) + .toBeCalledWith({ database: input && input.database ? input.database : '', accessMode: READ }) + + promise.catch(_ => 'Do nothing').finally(() => { }) + }) + function mockCreateConnectonProvider(connectionProvider: ConnectionProvider) { return ( id: number, diff --git a/packages/neo4j-driver/test/internal/connectivity-verifier.test.js b/packages/neo4j-driver/test/internal/connectivity-verifier.test.js deleted file mode 100644 index f531daf7a..000000000 --- a/packages/neo4j-driver/test/internal/connectivity-verifier.test.js +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import SingleConnectionProvider from '../../../bolt-connection/lib/connection-provider/connection-provider-single' -import FakeConnection from './fake-connection' -import { internal } from 'neo4j-driver-core' - -const { - connectivityVerifier: { ConnectivityVerifier } -} = internal - -describe('#unit ConnectivityVerifier', () => { - it('should call success callback when able to acquire and release a connection', done => { - const connectionPromise = Promise.resolve(new FakeConnection()) - const connectionProvider = new SingleConnectionProvider(connectionPromise) - const verifier = new ConnectivityVerifier(connectionProvider) - - verifier.verify().then(() => { - done() - }) - }) -}) From 1c3667f532ab9f12d86537d317ec66e741b55080 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 16 Feb 2022 17:06:21 +0100 Subject: [PATCH 3/4] Add tests for `DirectConnectionProvider.verifyConnectivityAndGetServerInfo` --- .../connection-provider-pooled.js | 7 +- .../connection-provider-direct.test.js | 198 +++++++++++++++++- 2 files changed, 197 insertions(+), 8 deletions(-) diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index e40e5861d..6071f89c1 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -118,8 +118,11 @@ export default class PooledConnectionProvider extends ConnectionProvider { async _verifyConnectivityAndGetServerVersion ({ address }) { const connection = await this._connectionPool.acquire(address) const serverInfo = new ServerInfo(connection.server, connection.protocol().version) - await connection.resetAndFlush().catch(() => {}) - await connection._release() + try { + await connection.resetAndFlush() + } finally { + await connection._release() + } return serverInfo } diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index f2b33546a..ef0ba5c4a 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -20,7 +20,7 @@ import DirectConnectionProvider from '../../src/connection-provider/connection-provider-direct' import { Pool } from '../../src/pool' import { Connection, DelegateConnection } from '../../src/connection' -import { internal, newError } from 'neo4j-driver-core' +import { internal, newError, ServerInfo } from 'neo4j-driver-core' const { serverAddress: { ServerAddress }, @@ -139,6 +139,180 @@ it('should not change error when TokenExpired happens', async () => { expect(error).toBe(expectedError) }) +describe('.verifyConnectivityAndGetServerInfo()', () => { + describe('when connection is available in the pool', () => { + it('should return the server info', async () => { + const { connectionProvider, server, protocolVersion } = setup() + + const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo() + + expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion)) + }) + + it('should reset and flush the connection', async () => { + const { connectionProvider, resetAndFlush } = setup() + + await connectionProvider.verifyConnectivityAndGetServerInfo() + + expect(resetAndFlush).toBeCalledTimes(1) + }) + + it('should release the connection', async () => { + const { connectionProvider, seenConnections } = setup() + + await connectionProvider.verifyConnectivityAndGetServerInfo() + + expect(seenConnections[0]._release).toHaveBeenCalledTimes(1) + }) + + it('should resetAndFlush and then release the connection', async () => { + const { connectionProvider, seenConnections, resetAndFlush } = setup() + + await connectionProvider.verifyConnectivityAndGetServerInfo() + + expect(seenConnections[0]._release.mock.invocationCallOrder[0]) + .toBeGreaterThan(resetAndFlush.mock.invocationCallOrder[0]) + }) + + describe('when reset and flush fails', () => { + it('should fails with the reset and flush error', async () => { + const error = newError('Error') + const { connectionProvider, resetAndFlush } = setup() + + resetAndFlush.mockRejectedValue(error) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo() + expect().toBe('Not reached') + } catch (e) { + expect(e).toBe(error) + } + }) + + it('should release the connection', async () => { + const error = newError('Error') + const { connectionProvider, resetAndFlush, seenConnections } = setup() + + resetAndFlush.mockRejectedValue(error) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo() + } catch (e) { + } finally { + expect(seenConnections[0]._release).toHaveBeenCalledTimes(1) + } + }) + + describe('and release fails', () => { + it('should fails with the release error', async () => { + const error = newError('Error') + const releaseError = newError('release errror') + + const { connectionProvider, resetAndFlush } = setup( + { + releaseMock: () => Promise.reject(releaseError) + }) + + resetAndFlush.mockRejectedValue(error) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo() + expect().toBe('Not reached') + } catch (e) { + expect(e).toBe(releaseError) + } + }) + }) + }) + + describe('when release fails', () => { + it('should fails with the release error', async () => { + const error = newError('Error') + + const { connectionProvider } = setup( + { + releaseMock: () => Promise.reject(error) + }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo() + expect().toBe('Not reached') + } catch (e) { + expect(e).toBe(error) + } + }) + }) + + function setup({ releaseMock } = {}) { + const protocolVersion = 4.4 + const resetAndFlush = jest.fn(() => Promise.resolve()) + const server = { address: 'localhost:123', version: 'neo4j/1234' } + const seenConnections = [] + const create = (address, release) => { + const connection = new FakeConnection(address, release, server) + connection.protocol = () => { + return { version: protocolVersion } + } + connection.resetAndFlush = resetAndFlush + if (releaseMock) { + connection._release = releaseMock + } + seenConnections.push(connection) + return connection + } + const address = ServerAddress.fromUrl('localhost:123') + const pool = newPool({ create }) + const connectionProvider = newDirectConnectionProvider(address, pool) + return { + connectionProvider, + server, + protocolVersion, + resetAndFlush, + seenConnections + } + } + }) + + describe('when connection is not available in the pool', () => { + it('should reject with acquisition timeout error', async () => { + const address = ServerAddress.fromUrl('localhost:123') + const pool = newPool({ + config: { + acquisitionTimeout: 0, + } + }) + + const connectionProvider = newDirectConnectionProvider(address, pool) + + try { + connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo() + expect().toBe('not reached') + } catch (e) { + expect(e).toBeDefined() + } + }) + }) + + describe('when connection it could not create the connection', () => { + it('should reject with connection creation error', async () => { + const error = new Error('Connection creation error') + const address = ServerAddress.fromUrl('localhost:123') + const pool = newPool({ + create: () => { throw error } + }) + + const connectionProvider = newDirectConnectionProvider(address, pool) + + try { + connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo() + expect().toBe('not reached') + } catch (e) { + expect(e).toBe(error) + } + }) + }) +}) + function newDirectConnectionProvider (address, pool) { const connectionProvider = new DirectConnectionProvider({ id: 0, @@ -150,22 +324,34 @@ function newDirectConnectionProvider (address, pool) { return connectionProvider } -function newPool () { +function newPool({ create, config } = {}) { + const _create = (address, release) => { + if (create) { + return create(address, release) + } + return new FakeConnection(address, release) + } return new Pool({ + config, create: (address, release) => - Promise.resolve(new FakeConnection(address, release)) + Promise.resolve(_create(address, release)), }) } class FakeConnection extends Connection { - constructor (address, release) { + constructor(address, release, server) { super(null) this._address = address - this.release = release + this._release = jest.fn(() => release(address, this)) + this._server = server } - get address () { + get address() { return this._address } + + get server() { + return this._server + } } From 0123eebda1cb57eba2896570a983e31a6349b6ed Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 16 Feb 2022 19:11:17 +0100 Subject: [PATCH 4/4] Add tests for `RoutingConnectionProvider.verifyConnectivityAndGetServerInfo` --- .../connection-provider-routing.test.js | 256 +++++++++++++++++- 1 file changed, 251 insertions(+), 5 deletions(-) diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index c969bee71..6089ffbad 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -23,7 +23,8 @@ import { error, Integer, int, - internal + internal, + ServerInfo } from 'neo4j-driver-core' import { RoutingTable } from '../../src/rediscovery/' import { Pool } from '../../src/pool' @@ -2474,6 +2475,234 @@ describe('#unit RoutingConnectionProvider', () => { }) }) + + describe.each([ + [undefined, READ], + [undefined, WRITE], + ['', READ], + ['', WRITE], + ['databaseA', READ], + ['databaseA', WRITE], + ])('.verifyConnectivityAndGetServeInfo({ database: %s, accessMode: %s })', (database, accessMode) => { + describe('when connection is available in the pool', () => { + it('should return the server info', async () => { + const { connectionProvider, server, protocolVersion } = setup() + + const serverInfo = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + expect(serverInfo).toEqual(new ServerInfo(server, protocolVersion)) + }) + + it('should acquire, resetAndFlush and release connections for sever with the selected access mode', async () => { + const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup() + const acquireSpy = jest.spyOn(pool, 'acquire') + + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers + for (const address of targetServers) { + expect(acquireSpy).toHaveBeenCalledWith(address) + + const connections = seenConnectionsPerAddress.get(address) + + expect(connections.length).toBe(1) + expect(connections[0].resetAndFlush).toHaveBeenCalled() + expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0]._release.mock.invocationCallOrder[0]) + .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) + } + }) + + it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => { + const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup() + const acquireSpy = jest.spyOn(pool, 'acquire') + + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + const targetServers = accessMode === WRITE ? routingTable.readers : routingTable.writers + for (const address of targetServers) { + expect(acquireSpy).not.toHaveBeenCalledWith(address) + expect(seenConnectionsPerAddress.get(address)).toBeUndefined() + } + }) + + describe('when the reset and flush fails for at least one the address', () => { + it('should fails with the reset and flush error', async () => { + const error = newError('Error') + let i = 0 + const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const { connectionProvider } = setup({ resetAndFlush }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('Not reached') + } catch (e) { + expect(e).toBe(error) + } + }) + + it('should release the connection', async () => { + const error = newError('Error') + let i = 0 + const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const { connectionProvider, seenConnectionsPerAddress, routingTable } = setup({ resetAndFlush }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + } catch (e) { + } finally { + const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers + for (const address of targetServers) { + const connections = seenConnectionsPerAddress.get(address) + + expect(connections.length).toBe(1) + expect(connections[0].resetAndFlush).toHaveBeenCalled() + expect(connections[0]._release).toHaveBeenCalled() + } + } + }) + + describe('and the release fails', () => { + it('should fails with the release error', async () => { + const error = newError('Error') + const releaseError = newError('Release error') + let i = 0 + const resetAndFlush = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const releaseMock = jest.fn(() => Promise.reject(releaseError)) + const { connectionProvider } = setup({ resetAndFlush, releaseMock }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('Not reached') + } catch (e) { + expect(e).toBe(releaseError) + } + }) + }) + + }) + + describe('when the release for at least one the address', () => { + it('should fails with the reset and flush error', async () => { + const error = newError('Error') + let i = 0 + const releaseMock = jest.fn(() => i++ % 2 == 0 ? Promise.reject(error) : Promise.resolve()) + const { connectionProvider } = setup({ releaseMock }) + + try { + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('Not reached') + } catch (e) { + expect(e).toBe(error) + } + }) + }) + + function setup({ resetAndFlush, releaseMock } = {}) { + const routingTable = newRoutingTable( + database || null, + [server1, server2], + [server3, server4], + [server5, server6] + ) + const protocolVersion = 4.4 + const server = { address: 'localhost:123', version: 'neo4j/1234' } + + const seenConnectionsPerAddress = new Map() + + const pool = newPool({ + create: (address, release) => { + if (!seenConnectionsPerAddress.has(address)) { + seenConnectionsPerAddress.set(address, []) + } + const connection = new FakeConnection(address, release, 'version', protocolVersion, server) + if (resetAndFlush) { + connection.resetAndFlush = resetAndFlush + } + if (releaseMock) { + connection._release = releaseMock + } + seenConnectionsPerAddress.get(address).push(connection) + return connection + } + }) + const connectionProvider = newRoutingConnectionProvider( + [ + routingTable + ], + pool + ) + return { connectionProvider, routingTable, seenConnectionsPerAddress, server, protocolVersion, pool } + } + }) + + describe('when at least the one of the servers is not available', () => { + it('should reject with acquistion timeout error', async () => { + const routingTable = newRoutingTable( + database || null, + [server1, server2], + [server3, server4], + [server5, server6] + ) + + const pool = newPool({ + config: { + acquisitionTimeout: 0, + } + }) + + const connectionProvider = newRoutingConnectionProvider( + [ + routingTable + ], + pool + ) + + try { + connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('not reached') + } catch (e) { + expect(e).toBeDefined() + } + }) + }) + + describe('when at least the one of the connections could not be created', () => { + it('should reject with acquistion timeout error', async () => { + let i = 0 + const error = new Error('Connection creation error') + const routingTable = newRoutingTable( + database || null, + [server1, server2], + [server3, server4], + [server5, server6] + ) + + const pool = newPool({ + create: (address, release) => { + if (i++ % 2 === 0) { + return new FakeConnection(address, release, 'version', 4.4, {}) + } + throw error + } + }) + + const connectionProvider = newRoutingConnectionProvider( + [ + routingTable + ], + pool + ) + + try { + connectionProvider = await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + expect().toBe('not reached') + } catch (e) { + expect(e).toBe(error) + } + }) + }) + }) }) function newRoutingConnectionProvider ( @@ -2567,10 +2796,20 @@ function setupRoutingConnectionProviderToRememberRouters ( connectionProvider._fetchRoutingTable = rememberingFetch } -function newPool () { +function newPool ({ create, config } = {}) { + const _create = (address, release) => { + if (create) { + try { + return Promise.resolve(create(address, release)) + } catch (e) { + return Promise.reject(e) + } + } + return Promise.resolve(new FakeConnection(address, release, 'version', 4.0)) + } return new Pool({ - create: (address, release) => - Promise.resolve(new FakeConnection(address, release, 'version', 4.0)) + config, + create: (address, release) => _create(address, release), }) } @@ -2605,13 +2844,16 @@ function expectPoolToNotContain (pool, addresses) { } class FakeConnection extends Connection { - constructor (address, release, version, protocolVersion) { + constructor (address, release, version, protocolVersion, server) { super(null) this._address = address this._version = version || VERSION_IN_DEV.toString() this._protocolVersion = protocolVersion this.release = release + this._release = jest.fn(() => release(address, this)) + this.resetAndFlush = jest.fn(() => Promise.resolve()) + this._server = server } get address () { @@ -2622,6 +2864,10 @@ class FakeConnection extends Connection { return this._version } + get server () { + return this._server + } + protocol () { return { version: this._protocolVersion