From ccb49bb552237e48d9e7c4a3188c8d3c257e773a Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 7 Sep 2023 14:22:25 +0200 Subject: [PATCH 1/9] Hide protocol object behind the Connection The connection in the driver is exposing the protocol object, which is not great. Improve this part of the code can be done by make the connection object have methods to do high level requests to the server. List of calls the core driver does using the protocol, by passing the connection: * `version` * `run` * `beginTransaction` * `commitTransaction` * `rollbackTransaction` Methods is present in the `Connection` interface (used by core in **bold**): * `id` * `database` * `server` * `authToken` * `address` * `version` * `supportsReAuth` * **`isOpen`** * **`protocol`** * `connect` * `write` * **`resetAndFlush`** * **`hasOngoingObservableRequests`** * **`_release`** So, `isOpen`, `resetAndFlush` and `hasOngoingObservableRequests` are the methods which will stay in the connection along with the new methods. The method `release` will move the a `Releasable` interface, which will be composed with `Connection` when returning the connection from the provider. The `Releasable` interface is also defined to enable the `ConnectionProvider` returns a connection which can be released back to the pool. Internally, `bolt-connection` can keep exposing the internal of the connection outside the connection in a first moment. The full encapsulation of the `protocol` should be done in the next phase of refactoring. --- packages/core/src/connection-provider.ts | 17 +- packages/core/src/connection.ts | 154 +++++++---------- .../core/src/internal/connection-holder.ts | 20 +-- packages/core/src/internal/constants.ts | 2 + packages/core/src/result.ts | 2 +- packages/core/src/session.ts | 2 +- packages/core/src/transaction.ts | 14 +- packages/core/test/result.test.ts | 30 ++-- packages/core/test/session.test.ts | 36 ++-- packages/core/test/transaction.test.ts | 30 ++-- packages/core/test/utils/connection.fake.ts | 62 ++++--- .../lib/core/connection-provider.ts | 17 +- .../neo4j-driver-deno/lib/core/connection.ts | 159 ++++++++---------- .../lib/core/internal/connection-holder.ts | 20 +-- .../lib/core/internal/constants.ts | 2 + packages/neo4j-driver-deno/lib/core/result.ts | 2 +- .../neo4j-driver-deno/lib/core/session.ts | 2 +- .../neo4j-driver-deno/lib/core/transaction.ts | 14 +- 18 files changed, 273 insertions(+), 312 deletions(-) diff --git a/packages/core/src/connection-provider.ts b/packages/core/src/connection-provider.ts index bd0c96f03..a9378bf15 100644 --- a/packages/core/src/connection-provider.ts +++ b/packages/core/src/connection-provider.ts @@ -23,6 +23,18 @@ import { bookmarks } from './internal' import { ServerInfo } from './result-summary' import { AuthToken } from './types' +/** + * Interface define a releasable resource shape + * + * @private + * @interface + */ +class Releasable { + release (): Promise { + throw new Error('Not implemented') + } +} + /** * Interface define a common way to acquire a connection * @@ -53,7 +65,7 @@ class ConnectionProvider { impersonatedUser?: string onDatabaseNameResolved?: (databaseName?: string) => void auth?: AuthToken - }): Promise { + }): Promise { throw Error('Not implemented') } @@ -150,3 +162,6 @@ class ConnectionProvider { } export default ConnectionProvider +export { + Releasable +} diff --git a/packages/core/src/connection.ts b/packages/core/src/connection.ts index 5782b7c93..367d14945 100644 --- a/packages/core/src/connection.ts +++ b/packages/core/src/connection.ts @@ -18,121 +18,91 @@ */ /* eslint-disable @typescript-eslint/promise-function-async */ -import { ServerAddress } from './internal/server-address' - -/** - * Interface which defines the raw connection with the database - * @private - */ -class Connection { - get id (): string { - return '' - } +import { Bookmarks } from './internal/bookmarks' +import { AccessMode } from './internal/constants' +import { ResultStreamObserver } from './internal/observers' +import { TxConfig } from './internal/tx-config' +import NotificationFilter from './notification-filter' + +interface HasBeforeErrorAndAfterComplete { + beforeError?: (error: Error) => void + afterComplete?: (metadata: unknown) => void +} - get databaseId (): string { - return '' - } +interface BeginTransactionConfig extends HasBeforeErrorAndAfterComplete { + bookmarks: Bookmarks + txConfig: TxConfig + mode?: AccessMode + database?: string + impersonatedUser?: string + notificationFilter?: NotificationFilter +} - get server (): any { - return {} - } +interface CommitTransactionConfig extends HasBeforeErrorAndAfterComplete { - /** - * @property {object} authToken The auth registered in the connection - */ - get authToken (): any { - return {} - } +} - /** - * @property {ServerAddress} the server address this connection is opened against - */ - get address (): ServerAddress | undefined { - return undefined - } +interface RollbackConnectionConfig extends HasBeforeErrorAndAfterComplete { - /** - * @property {ServerVersion} the version of the server this connection is connected to - */ - get version (): any { - return undefined - } +} - /** - * @property {boolean} supportsReAuth Indicates the connection supports re-auth - */ - get supportsReAuth (): boolean { - return false - } +interface RunQueryConfig extends BeginTransactionConfig { + fetchSize: number + highRecordWatermark: number + lowRecordWatermark: number + reactive: boolean +} - /** - * @returns {boolean} whether this connection is in a working condition - */ - isOpen (): boolean { - return false +/** + * Interface which defines a connection for the core driver object. + * + * + * This connection exposes only methods used by the code module. + * Methods with connection implementation details can be defined and used + * by the implementation layer. + * + * @private + * @interface + */ +class Connection { + beginTransaction (config: BeginTransactionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * @todo be removed and internalize the methods - * @returns {any} the underlying bolt protocol assigned to this connection - */ - protocol (): any { - throw Error('Not implemented') + run (query: string, parameters?: Record, config?: RunQueryConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Connect to the target address, negotiate Bolt protocol and send initialization message. - * @param {string} userAgent the user agent for this driver. - * @param {string} boltAgent the bolt agent for this driver. - * @param {Object} authToken the object containing auth information. - * @param {Object} waitReAuth whether to connect method should wait until re-Authorised - * @return {Promise} promise resolved with the current connection if connection is successful. Rejected promise otherwise. - */ - connect (userAgent: string, boltAgent: string, authToken: any, waitReAuth: false): Promise { - throw Error('Not implemented') + commitTransaction (config: CommitTransactionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Write a message to the network channel. - * @param {RequestMessage} message the message to write. - * @param {ResultStreamObserver} observer the response observer. - * @param {boolean} flush `true` if flush should happen after the message is written to the buffer. - */ - write (message: any, observer: any, flush: boolean): void { - throw Error('Not implemented') + rollbackTransaction (config: RollbackConnectionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Send a RESET-message to the database. Message is immediately flushed to the network. - * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. - */ resetAndFlush (): Promise { - throw Error('Not implemented') + throw new Error('Not implemented') } - /** - * Checks if there is an ongoing request being handled - * @return {boolean} `true` if there is an ongoing request being handled - */ - hasOngoingObservableRequests (): boolean { - throw Error('Not implemented') + isOpen (): boolean { + throw new Error('Not implemented') } - /** - * Call close on the channel. - * @returns {Promise} - A promise that will be resolved when the connection is closed. - * - */ - close (): Promise { - throw Error('Not implemented') + getProtocolVersion (): number { + throw new Error('Not implemented') } - /** - * Called to release the connection - */ - _release (): Promise { - return Promise.resolve() + hasOngoingObservableRequests (): boolean { + throw new Error('Not implemented') } } export default Connection + +export type { + BeginTransactionConfig, + CommitTransactionConfig, + RollbackConnectionConfig, + RunQueryConfig +} diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 8ad38774d..cb854c679 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -21,9 +21,9 @@ import { newError } from '../error' import { assertString } from './util' import Connection from '../connection' -import { ACCESS_MODE_WRITE } from './constants' +import { ACCESS_MODE_WRITE, AccessMode } from './constants' import { Bookmarks } from './bookmarks' -import ConnectionProvider from '../connection-provider' +import ConnectionProvider, { Releasable } from '../connection-provider' import { AuthToken } from '../types' /** @@ -77,12 +77,12 @@ interface ConnectionHolderInterface { * @private */ class ConnectionHolder implements ConnectionHolderInterface { - private readonly _mode: string + private readonly _mode: AccessMode private _database?: string private readonly _bookmarks: Bookmarks private readonly _connectionProvider?: ConnectionProvider private _referenceCount: number - private _connectionPromise: Promise + private _connectionPromise: Promise private readonly _impersonatedUser?: string private readonly _getConnectionAcquistionBookmarks: () => Promise private readonly _onDatabaseNameResolved?: (databaseName?: string) => void @@ -111,7 +111,7 @@ class ConnectionHolder implements ConnectionHolderInterface { getConnectionAcquistionBookmarks, auth }: { - mode?: string + mode?: AccessMode database?: string bookmarks?: Bookmarks connectionProvider?: ConnectionProvider @@ -133,7 +133,7 @@ class ConnectionHolder implements ConnectionHolderInterface { this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty())) } - mode (): string | undefined { + mode (): AccessMode | undefined { return this._mode } @@ -168,7 +168,7 @@ class ConnectionHolder implements ConnectionHolderInterface { return true } - private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { + private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { return await connectionProvider.acquireConnection({ accessMode: this._mode, database: this._database, @@ -218,15 +218,15 @@ class ConnectionHolder implements ConnectionHolderInterface { */ private _releaseConnection (hasTx?: boolean): Promise { this._connectionPromise = this._connectionPromise - .then((connection?: Connection | null) => { + .then((connection?: Connection & Releasable | null) => { if (connection != null) { if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx === true)) { return connection .resetAndFlush() .catch(ignoreError) - .then(() => connection._release().then(() => null)) + .then(() => connection.release().then(() => null)) } - return connection._release().then(() => null) + return connection.release().then(() => null) } else { return Promise.resolve(null) } diff --git a/packages/core/src/internal/constants.ts b/packages/core/src/internal/constants.ts index 2c56e9f1b..f4cf14d9d 100644 --- a/packages/core/src/internal/constants.ts +++ b/packages/core/src/internal/constants.ts @@ -38,6 +38,8 @@ const BOLT_PROTOCOL_V5_1: number = 5.1 const BOLT_PROTOCOL_V5_2: number = 5.2 const BOLT_PROTOCOL_V5_3: number = 5.3 +export type AccessMode = typeof ACCESS_MODE_READ | typeof ACCESS_MODE_WRITE + export { FETCH_ALL, ACCESS_MODE_READ, diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index f025d9fa0..1ef7771d2 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -513,7 +513,7 @@ class Result implements Promise - connection?.protocol()?.version + connection?.getProtocolVersion() ), // onRejected: _ => undefined diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index bb3b5572a..4e28dc36c 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -187,7 +187,7 @@ class Session { const result = this._run(validatedQuery, params, async connection => { const bookmarks = await this._bookmarks() this._assertSessionIsOpen() - return (connection as Connection).protocol().run(validatedQuery, params, { + return (connection as Connection).run(validatedQuery, params, { bookmarks, txConfig: autoCommitTxConfig, mode: this._mode, diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index c5c979c57..1f51c142e 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -139,7 +139,7 @@ class Transaction { this._onConnection() if (connection != null) { this._bookmarks = await getBookmarks() - return connection.protocol().beginTransaction({ + return connection.beginTransaction({ bookmarks: this._bookmarks, txConfig, mode: this._connectionHolder.mode(), @@ -150,13 +150,13 @@ class Transaction { if (events != null) { events.onError(error) } - return this._onError(error) + this._onError(error).catch(() => {}) }, afterComplete: (metadata: any) => { if (events != null) { events.onComplete(metadata) } - return this._onComplete(metadata) + this._onComplete(metadata) } }) } else { @@ -364,7 +364,7 @@ const _states = { } }, run: ( - query: Query, + query: string, parameters: any, { connectionHolder, @@ -388,7 +388,7 @@ const _states = { .then(conn => { onConnection() if (conn != null) { - return conn.protocol().run(query, parameters, { + return conn.run(query, parameters, { bookmarks: Bookmarks.empty(), txConfig: TxConfig.empty(), beforeError: onError, @@ -643,12 +643,12 @@ function finishTransaction ( return Promise.all(pendingResults.map(result => result.summary())).then(results => { if (connection != null) { if (commit) { - return connection.protocol().commitTransaction({ + return connection.commitTransaction({ beforeError: onError, afterComplete: onComplete }) } else { - return connection.protocol().rollbackTransaction({ + return connection.rollbackTransaction({ beforeError: onError, afterComplete: onComplete }) diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index c781dbaaf..22b7cb479 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -466,14 +466,15 @@ describe('Result', () => { it.each([123, undefined])( 'should enrich summary with the protocol version onCompleted', async version => { - const connectionMock = { - protocol: () => { - return { version } - } - } + const connectionMock = new FakeConnection() + // converting to accept number as undefined + // this test is considering the situation where protocol version + // is undefined, which it should not happen in normal driver + // operation. + connectionMock.protocolVersion = version as unknown as number connectionHolderMock.getConnection = async (): Promise => { - return asConnection(connectionMock) + return connectionMock } const metadata = { resultConsumedAfter: 20, @@ -679,14 +680,15 @@ describe('Result', () => { it.each([123, undefined])( 'should enrich summary with the protocol version on completed', async version => { - const connectionMock = { - protocol: () => { - return { version } - } - } + const connectionMock = new FakeConnection() + // converting to accept number as undefined + // this test is considering the situation where protocol version + // is undefined, which it should not happen in normal driver + // operation. + connectionMock.protocolVersion = version as unknown as number connectionHolderMock.getConnection = async (): Promise => { - return await Promise.resolve(asConnection(connectionMock)) + return await Promise.resolve(connectionMock) } const metadata = { resultConsumedAfter: 20, @@ -1719,7 +1721,3 @@ function simulateStream ( } */ } - -function asConnection (value: any): Connection { - return value -} diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index a23863b59..0a9675f11 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -17,6 +17,8 @@ * limitations under the License. */ import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction, BookmarkManager, bookmarkManager, NotificationFilter, int } from '../src' +import { BeginTransactionConfig, CommitTransactionConfig } from '../src/connection' +import { Releasable } from '../src/connection-provider' import { bookmarks } from '../src/internal' import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants' import { Logger } from '../src/internal/logger' @@ -1173,36 +1175,28 @@ describe('session', () => { }) function mockBeginWithSuccess (connection: FakeConnection): FakeConnection { - const protocol = connection.protocol() - connection.protocol = () => { - return { - ...protocol, - beginTransaction: (params: { afterComplete: () => {} }, ...args: any[]) => { - protocol.beginTransaction([params, ...args]) - params.afterComplete() - } - } + const originalBegin = connection.beginTransaction.bind(connection) + connection.beginTransaction = (config: BeginTransactionConfig) => { + const stream = originalBegin(config) + config.afterComplete?.call(null, {}) + return stream } return connection } function mockCommitWithSuccess (connection: FakeConnection, metadata: any): FakeConnection { - const protocol = connection.protocol() - connection.protocol = () => { - return { - ...protocol, - commitTransaction: (params: { afterComplete: (metadata: any) => {} }, ...args: any[]) => { - const observer = protocol.commitTransaction(...[params, ...args]) - params.afterComplete(metadata) - return observer - } - } + const originalCommit = connection.commitTransaction.bind(connection) + + connection.commitTransaction = (config: CommitTransactionConfig) => { + const stream = originalCommit(config) + config.afterComplete?.call(null, metadata) + return stream } return connection } function newSessionWithConnection ( - connection: Connection, + connection: Connection & Releasable, beginTx: boolean = true, fetchSize: number = 1000, lastBookmarks: bookmarks.Bookmarks = bookmarks.Bookmarks.empty(), @@ -1224,7 +1218,7 @@ function setupSession ({ notificationFilter, auth }: { - connection: Connection + connection: Connection & Releasable beginTx?: boolean fetchSize?: number lastBookmarks?: bookmarks.Bookmarks diff --git a/packages/core/test/transaction.test.ts b/packages/core/test/transaction.test.ts index 0c8a23bda..92a59f24f 100644 --- a/packages/core/test/transaction.test.ts +++ b/packages/core/test/transaction.test.ts @@ -18,6 +18,7 @@ */ import { ConnectionProvider, newError, NotificationFilter, Transaction, TransactionPromise } from '../src' +import { BeginTransactionConfig } from '../src/connection' import { Bookmarks } from '../src/internal/bookmarks' import { ConnectionHolder } from '../src/internal/connection-holder' import { TxConfig } from '../src/internal/tx-config' @@ -137,15 +138,12 @@ testTx('TransactionPromise', newTransactionPromise, () => { function setupTx (): [TransactionPromise] { const connection = newFakeConnection() - const protocol = connection.protocol() - - connection.protocol = () => { - return { - ...protocol, - beginTransaction: (params: { afterComplete: (meta: any) => void }) => { - ctx(() => params.afterComplete({})) - } - } + const originalBegin = connection.beginTransaction.bind(connection) + + connection.beginTransaction = (config: BeginTransactionConfig) => { + const stream = originalBegin(config) + ctx(() => config.afterComplete?.call(null, {})) + return stream } const tx = newTransactionPromise({ @@ -251,16 +249,14 @@ testTx('TransactionPromise', newTransactionPromise, () => { function setupTx (): [TransactionPromise, Error] { const connection = newFakeConnection() - const protocol = connection.protocol() const expectedError = newError('begin error') - connection.protocol = () => { - return { - ...protocol, - beginTransaction: (params: { beforeError: (error: Error) => void }) => { - ctx(() => params.beforeError(expectedError)) - } - } + const originalBegin = connection.beginTransaction.bind(connection) + + connection.beginTransaction = (config: BeginTransactionConfig) => { + const stream = originalBegin(config) + ctx(() => config.beforeError?.call(null, expectedError)) + return stream } const tx = newTransactionPromise({ diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts index 9e7d6bee9..c52035f29 100644 --- a/packages/core/test/utils/connection.fake.ts +++ b/packages/core/test/utils/connection.fake.ts @@ -18,6 +18,7 @@ */ import { Connection, ResultObserver, ResultSummary } from '../../src' +import { BeginTransactionConfig, CommitTransactionConfig, RollbackConnectionConfig, RunQueryConfig } from '../../src/connection' import { ResultStreamObserver } from '../../src/internal/observers' /** @@ -39,7 +40,7 @@ export default class FakeConnection extends Connection { public seenParameters: any[] public seenProtocolOptions: any[] private readonly _server: any - public protocolVersion: number | undefined + public protocolVersion: number public protocolErrorsHandled: number public seenProtocolErrors: string[] public seenRequestRoutingInformation: any[] @@ -62,7 +63,7 @@ export default class FakeConnection extends Connection { this.seenParameters = [] this.seenProtocolOptions = [] this._server = {} - this.protocolVersion = undefined + this.protocolVersion = 1 this.protocolErrorsHandled = 0 this.seenProtocolErrors = [] this.seenRequestRoutingInformation = [] @@ -96,44 +97,39 @@ export default class FakeConnection extends Connection { this._server.version = value } - protocol (): any { - // return fake protocol object that simply records seen queries and parameters - return { - run: (query: string, parameters: any | undefined, protocolOptions: any | undefined): ResultStreamObserver => { - this.seenQueries.push(query) - this.seenParameters.push(parameters) - this.seenProtocolOptions.push(protocolOptions) - return mockResultStreamObserver(query, parameters) - }, - commitTransaction: () => { - return mockResultStreamObserver('COMMIT', {}) - }, - beginTransaction: async (...args: any) => { - this.seenBeginTransaction.push(...args) - return await Promise.resolve() - }, - rollbackTransaction: () => { - this.rollbackInvoked++ - if (this._rollbackError !== null) { - return mockResultStreamObserverWithError('ROLLBACK', {}, this._rollbackError) - } - return mockResultStreamObserver('ROLLBACK', {}) - }, - requestRoutingInformation: (params: any | undefined) => { - this.seenRequestRoutingInformation.push(params) - if (this._requestRoutingInformationMock != null) { - this._requestRoutingInformationMock(params) - } - }, - version: this.protocolVersion + beginTransaction (config: BeginTransactionConfig): ResultStreamObserver { + this.seenBeginTransaction.push([config]) + return mockResultStreamObserver('BEGIN', {}) + } + + run (query: string, parameters?: Record | undefined, config?: RunQueryConfig | undefined): ResultStreamObserver { + this.seenQueries.push(query) + this.seenParameters.push(parameters) + this.seenProtocolOptions.push(config) + return mockResultStreamObserver(query, parameters) + } + + commitTransaction (config: CommitTransactionConfig): ResultStreamObserver { + return mockResultStreamObserver('COMMIT', {}) + } + + rollbackTransaction (config: RollbackConnectionConfig): ResultStreamObserver { + this.rollbackInvoked++ + if (this._rollbackError !== null) { + return mockResultStreamObserverWithError('ROLLBACK', {}, this._rollbackError) } + return mockResultStreamObserver('ROLLBACK', {}) + } + + getProtocolVersion (): number { + return this.protocolVersion } async resetAndFlush (): Promise { this.resetInvoked++ } - async _release (): Promise { + async release (): Promise { this.releaseInvoked++ } diff --git a/packages/neo4j-driver-deno/lib/core/connection-provider.ts b/packages/neo4j-driver-deno/lib/core/connection-provider.ts index 5de1b320d..71268dd2a 100644 --- a/packages/neo4j-driver-deno/lib/core/connection-provider.ts +++ b/packages/neo4j-driver-deno/lib/core/connection-provider.ts @@ -23,6 +23,18 @@ import { bookmarks } from './internal/index.ts' import { ServerInfo } from './result-summary.ts' import { AuthToken } from './types.ts' +/** + * Interface define a releasable resource shape + * + * @private + * @interface + */ +class Releasable { + release(): Promise { + throw new Error('Not implemented') + } +} + /** * Interface define a common way to acquire a connection * @@ -53,7 +65,7 @@ class ConnectionProvider { impersonatedUser?: string onDatabaseNameResolved?: (databaseName?: string) => void auth?: AuthToken - }): Promise { + }): Promise { throw Error('Not implemented') } @@ -150,3 +162,6 @@ class ConnectionProvider { } export default ConnectionProvider +export { + Releasable +} diff --git a/packages/neo4j-driver-deno/lib/core/connection.ts b/packages/neo4j-driver-deno/lib/core/connection.ts index 9ac5950ce..94a67ebfc 100644 --- a/packages/neo4j-driver-deno/lib/core/connection.ts +++ b/packages/neo4j-driver-deno/lib/core/connection.ts @@ -18,121 +18,94 @@ */ /* eslint-disable @typescript-eslint/promise-function-async */ -import { ServerAddress } from './internal/server-address.ts' +import { Bookmarks } from './internal/bookmarks.ts' +import { AccessMode } from './internal/constants.ts' +import { ResultStreamObserver } from './internal/observers.ts' +import { TxConfig } from './internal/tx-config.ts' +import NotificationFilter from './notification-filter.ts' -/** - * Interface which defines the raw connection with the database - * @private - */ -class Connection { - get id (): string { - return '' - } - get databaseId (): string { - return '' - } +interface HasBeforeErrorAndAfterComplete { + beforeError?: (error: Error) => void + afterComplete?: (metadata: unknown) => void +} - get server (): any { - return {} - } +interface BeginTransactionConfig extends HasBeforeErrorAndAfterComplete { + bookmarks: Bookmarks, + txConfig: TxConfig, + mode?: AccessMode + database?: string + impersonatedUser?: string + notificationFilter?: NotificationFilter +} - /** - * @property {object} authToken The auth registered in the connection - */ - get authToken (): any { - return {} - } +interface CommitTransactionConfig extends HasBeforeErrorAndAfterComplete { - /** - * @property {ServerAddress} the server address this connection is opened against - */ - get address (): ServerAddress | undefined { - return undefined - } +} - /** - * @property {ServerVersion} the version of the server this connection is connected to - */ - get version (): any { - return undefined - } +interface RollbackConnectionConfig extends HasBeforeErrorAndAfterComplete { - /** - * @property {boolean} supportsReAuth Indicates the connection supports re-auth - */ - get supportsReAuth (): boolean { - return false - } +} - /** - * @returns {boolean} whether this connection is in a working condition - */ - isOpen (): boolean { - return false - } +interface RunQueryConfig extends BeginTransactionConfig { + fetchSize: number + highRecordWatermark: number + lowRecordWatermark: number + reactive: boolean +} - /** - * @todo be removed and internalize the methods - * @returns {any} the underlying bolt protocol assigned to this connection - */ - protocol (): any { - throw Error('Not implemented') + +/** + * Interface which defines a connection for the core driver object. + * + * + * This connection exposes only methods used by the code module. + * Methods with connection implementation details can be defined and used + * by the implementation layer. + * + * @private + * @interface + */ +class Connection { + beginTransaction(config: BeginTransactionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Connect to the target address, negotiate Bolt protocol and send initialization message. - * @param {string} userAgent the user agent for this driver. - * @param {string} boltAgent the bolt agent for this driver. - * @param {Object} authToken the object containing auth information. - * @param {Object} waitReAuth whether to connect method should wait until re-Authorised - * @return {Promise} promise resolved with the current connection if connection is successful. Rejected promise otherwise. - */ - connect (userAgent: string, boltAgent: string, authToken: any, waitReAuth: false): Promise { - throw Error('Not implemented') + run(query: string, parameters?: Record, config?: RunQueryConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Write a message to the network channel. - * @param {RequestMessage} message the message to write. - * @param {ResultStreamObserver} observer the response observer. - * @param {boolean} flush `true` if flush should happen after the message is written to the buffer. - */ - write (message: any, observer: any, flush: boolean): void { - throw Error('Not implemented') + commitTransaction(config: CommitTransactionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Send a RESET-message to the database. Message is immediately flushed to the network. - * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. - */ - resetAndFlush (): Promise { - throw Error('Not implemented') + rollbackTransaction(config: RollbackConnectionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Checks if there is an ongoing request being handled - * @return {boolean} `true` if there is an ongoing request being handled - */ - hasOngoingObservableRequests (): boolean { - throw Error('Not implemented') + resetAndFlush(): Promise { + throw new Error('Not implemented') } - /** - * Call close on the channel. - * @returns {Promise} - A promise that will be resolved when the connection is closed. - * - */ - close (): Promise { - throw Error('Not implemented') + isOpen(): boolean { + throw new Error('Not implemented') } - /** - * Called to release the connection - */ - _release (): Promise { - return Promise.resolve() + getProtocolVersion(): number { + throw new Error('Not implemented') + } + + hasOngoingObservableRequests(): boolean { + throw new Error('Not implemented') } + } export default Connection + +export type { + BeginTransactionConfig, + CommitTransactionConfig, + RollbackConnectionConfig, + RunQueryConfig +} diff --git a/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts b/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts index 4d43ed1cc..5f13ba57c 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts @@ -21,9 +21,9 @@ import { newError } from '../error.ts' import { assertString } from './util.ts' import Connection from '../connection.ts' -import { ACCESS_MODE_WRITE } from './constants.ts' +import { ACCESS_MODE_WRITE, AccessMode } from './constants.ts' import { Bookmarks } from './bookmarks.ts' -import ConnectionProvider from '../connection-provider.ts' +import ConnectionProvider, { Releasable } from '../connection-provider.ts' import { AuthToken } from '../types.ts' /** @@ -77,12 +77,12 @@ interface ConnectionHolderInterface { * @private */ class ConnectionHolder implements ConnectionHolderInterface { - private readonly _mode: string + private readonly _mode: AccessMode private _database?: string private readonly _bookmarks: Bookmarks private readonly _connectionProvider?: ConnectionProvider private _referenceCount: number - private _connectionPromise: Promise + private _connectionPromise: Promise private readonly _impersonatedUser?: string private readonly _getConnectionAcquistionBookmarks: () => Promise private readonly _onDatabaseNameResolved?: (databaseName?: string) => void @@ -111,7 +111,7 @@ class ConnectionHolder implements ConnectionHolderInterface { getConnectionAcquistionBookmarks, auth }: { - mode?: string + mode?: AccessMode database?: string bookmarks?: Bookmarks connectionProvider?: ConnectionProvider @@ -133,7 +133,7 @@ class ConnectionHolder implements ConnectionHolderInterface { this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty())) } - mode (): string | undefined { + mode (): AccessMode | undefined { return this._mode } @@ -168,7 +168,7 @@ class ConnectionHolder implements ConnectionHolderInterface { return true } - private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { + private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { return await connectionProvider.acquireConnection({ accessMode: this._mode, database: this._database, @@ -218,15 +218,15 @@ class ConnectionHolder implements ConnectionHolderInterface { */ private _releaseConnection (hasTx?: boolean): Promise { this._connectionPromise = this._connectionPromise - .then((connection?: Connection | null) => { + .then((connection?: Connection & Releasable | null) => { if (connection != null) { if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx === true)) { return connection .resetAndFlush() .catch(ignoreError) - .then(() => connection._release().then(() => null)) + .then(() => connection.release().then(() => null)) } - return connection._release().then(() => null) + return connection.release().then(() => null) } else { return Promise.resolve(null) } diff --git a/packages/neo4j-driver-deno/lib/core/internal/constants.ts b/packages/neo4j-driver-deno/lib/core/internal/constants.ts index 2c56e9f1b..f4cf14d9d 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/constants.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/constants.ts @@ -38,6 +38,8 @@ const BOLT_PROTOCOL_V5_1: number = 5.1 const BOLT_PROTOCOL_V5_2: number = 5.2 const BOLT_PROTOCOL_V5_3: number = 5.3 +export type AccessMode = typeof ACCESS_MODE_READ | typeof ACCESS_MODE_WRITE + export { FETCH_ALL, ACCESS_MODE_READ, diff --git a/packages/neo4j-driver-deno/lib/core/result.ts b/packages/neo4j-driver-deno/lib/core/result.ts index 71dd5e92f..f000b6ac1 100644 --- a/packages/neo4j-driver-deno/lib/core/result.ts +++ b/packages/neo4j-driver-deno/lib/core/result.ts @@ -513,7 +513,7 @@ class Result implements Promise - connection?.protocol()?.version + connection?.getProtocolVersion() ), // onRejected: _ => undefined diff --git a/packages/neo4j-driver-deno/lib/core/session.ts b/packages/neo4j-driver-deno/lib/core/session.ts index 1dc41befa..906c6b374 100644 --- a/packages/neo4j-driver-deno/lib/core/session.ts +++ b/packages/neo4j-driver-deno/lib/core/session.ts @@ -187,7 +187,7 @@ class Session { const result = this._run(validatedQuery, params, async connection => { const bookmarks = await this._bookmarks() this._assertSessionIsOpen() - return (connection as Connection).protocol().run(validatedQuery, params, { + return (connection as Connection).run(validatedQuery, params, { bookmarks, txConfig: autoCommitTxConfig, mode: this._mode, diff --git a/packages/neo4j-driver-deno/lib/core/transaction.ts b/packages/neo4j-driver-deno/lib/core/transaction.ts index dc8a0cf1d..703577a97 100644 --- a/packages/neo4j-driver-deno/lib/core/transaction.ts +++ b/packages/neo4j-driver-deno/lib/core/transaction.ts @@ -139,7 +139,7 @@ class Transaction { this._onConnection() if (connection != null) { this._bookmarks = await getBookmarks() - return connection.protocol().beginTransaction({ + return connection.beginTransaction({ bookmarks: this._bookmarks, txConfig, mode: this._connectionHolder.mode(), @@ -150,13 +150,13 @@ class Transaction { if (events != null) { events.onError(error) } - return this._onError(error) + this._onError(error).catch(() => {}) }, afterComplete: (metadata: any) => { if (events != null) { events.onComplete(metadata) } - return this._onComplete(metadata) + this._onComplete(metadata) } }) } else { @@ -364,7 +364,7 @@ const _states = { } }, run: ( - query: Query, + query: string, parameters: any, { connectionHolder, @@ -388,7 +388,7 @@ const _states = { .then(conn => { onConnection() if (conn != null) { - return conn.protocol().run(query, parameters, { + return conn.run(query, parameters, { bookmarks: Bookmarks.empty(), txConfig: TxConfig.empty(), beforeError: onError, @@ -643,12 +643,12 @@ function finishTransaction ( return Promise.all(pendingResults.map(result => result.summary())).then(results => { if (connection != null) { if (commit) { - return connection.protocol().commitTransaction({ + return connection.commitTransaction({ beforeError: onError, afterComplete: onComplete }) } else { - return connection.protocol().rollbackTransaction({ + return connection.rollbackTransaction({ beforeError: onError, afterComplete: onComplete }) From 15303988687abfdd439632dcfd1734ecb3558eef Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 7 Sep 2023 14:43:33 +0200 Subject: [PATCH 2/9] Adjust some unit tests --- .../test/internal/connection-holder.test.js | 10 +++++----- packages/neo4j-driver/test/internal/fake-connection.js | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/neo4j-driver/test/internal/connection-holder.test.js b/packages/neo4j-driver/test/internal/connection-holder.test.js index 5ba4a4dd4..58110fa81 100644 --- a/packages/neo4j-driver/test/internal/connection-holder.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder.test.js @@ -316,7 +316,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(1) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) @@ -343,7 +343,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(0) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) @@ -369,7 +369,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(0) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) @@ -398,7 +398,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(1) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) @@ -424,7 +424,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(0) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) diff --git a/packages/neo4j-driver/test/internal/fake-connection.js b/packages/neo4j-driver/test/internal/fake-connection.js index 75b9497a8..2d390d5a7 100644 --- a/packages/neo4j-driver/test/internal/fake-connection.js +++ b/packages/neo4j-driver/test/internal/fake-connection.js @@ -110,7 +110,7 @@ export default class FakeConnection extends Connection { return Promise.resolve() } - _release () { + release () { this.releaseInvoked++ return Promise.resolve() } From 47d3d0b7e11fa3fa52892fe339400ff292de433b Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 7 Sep 2023 16:47:07 +0200 Subject: [PATCH 3/9] Implementing changes in the bolt-connection --- .../connection-provider-pooled.js | 8 +- .../src/connection/connection-channel.js | 20 ++ .../src/connection/connection-delegate.js | 24 ++- .../src/connection/connection.js | 4 +- .../connection-provider-direct.test.js | 14 +- .../connection-provider-routing.test.js | 28 +-- .../connection/connection-channel.test.js | 181 +++++++++++++++++- .../connection}/connection-delegate.test.js | 86 ++++++--- packages/bolt-connection/test/test-utils.js | 7 + .../connection-provider-pooled.js | 8 +- .../connection/connection-channel.js | 20 ++ .../connection/connection-delegate.js | 24 ++- .../bolt-connection/connection/connection.js | 4 +- .../lib/core/connection-provider.ts | 4 +- .../neo4j-driver-deno/lib/core/connection.ts | 37 ++-- 15 files changed, 386 insertions(+), 83 deletions(-) rename packages/{neo4j-driver/test/internal => bolt-connection/test/connection}/connection-delegate.test.js (61%) diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index e07a4e1bf..dd95da067 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -82,7 +82,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { */ _createConnection ({ auth }, address, release) { return this._createChannelConnection(address).then(connection => { - connection._release = () => { + connection.release = () => { return release(address, connection) } this._openConnections[connection.id] = connection @@ -160,7 +160,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { await connection.resetAndFlush() } } finally { - await connection._release() + await connection.release() } return serverInfo } @@ -191,7 +191,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { } throw error } finally { - await Promise.all(connectionsToRelease.map(conn => conn._release())) + await Promise.all(connectionsToRelease.map(conn => conn.release())) } } @@ -201,7 +201,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { connection._sticky = connectionWithSameCredentials && !connection.supportsReAuth if (shouldCreateStickyConnection || connection._sticky) { - await connection._release() + await connection.release() throw newError('Driver is connected to a database that does not support user switch.') } } diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index b2a0e9209..749cf23e6 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -156,6 +156,26 @@ export default class ChannelConnection extends Connection { } } + beginTransaction (config) { + return this._protocol.beginTransaction(config) + } + + run (query, parameters, config) { + return this._protocol.run(query, parameters, config) + } + + commitTransaction (config) { + return this._protocol.commitTransaction(config) + } + + rollbackTransaction (config) { + return this._protocol.rollbackTransaction(config) + } + + getProtocolVersion () { + return this._protocol.version + } + get authToken () { return this._authToken } diff --git a/packages/bolt-connection/src/connection/connection-delegate.js b/packages/bolt-connection/src/connection/connection-delegate.js index 31e2ade56..b122cddfb 100644 --- a/packages/bolt-connection/src/connection/connection-delegate.js +++ b/packages/bolt-connection/src/connection/connection-delegate.js @@ -35,6 +35,26 @@ export default class DelegateConnection extends Connection { this._delegate = delegate } + beginTransaction (config) { + return this._delegate.beginTransaction(config) + } + + run (query, param, config) { + return this._delegate.run(query, param, config) + } + + commitTransaction (config) { + return this._delegate.commitTransaction(config) + } + + rollbackTransaction (config) { + return this._delegate.rollbackTransaction(config) + } + + getProtocolVersion () { + return this._delegate.getProtocolVersion() + } + get id () { return this._delegate.id } @@ -103,11 +123,11 @@ export default class DelegateConnection extends Connection { return this._delegate.close() } - _release () { + release () { if (this._originalErrorHandler) { this._delegate._errorHandler = this._originalErrorHandler } - return this._delegate._release() + return this._delegate.release() } } diff --git a/packages/bolt-connection/src/connection/connection.js b/packages/bolt-connection/src/connection/connection.js index f016d2719..95798bee4 100644 --- a/packages/bolt-connection/src/connection/connection.js +++ b/packages/bolt-connection/src/connection/connection.js @@ -18,12 +18,14 @@ */ // eslint-disable-next-line no-unused-vars import { ResultStreamObserver, BoltProtocol } from '../bolt' +import { Connection as CoreConnection } from 'neo4j-driver-core' -export default class Connection { +export default class Connection extends CoreConnection { /** * @param {ConnectionErrorHandler} errorHandler the error handler */ constructor (errorHandler) { + super() this._errorHandler = errorHandler } diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index 0cfa6e06b..1ef8950ff 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -289,7 +289,7 @@ describe('constructor', () => { const connection = await create({}, server0, release) - const released = connection._release() + const released = connection.release() expect(released).toBe(releaseResult) expect(release).toHaveBeenCalledWith(server0, connection) @@ -546,7 +546,7 @@ describe('user-switching', () => { expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth: acquireAuth }, address) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) }) @@ -599,7 +599,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { await connectionProvider.verifyConnectivityAndGetServerInfo() - expect(seenConnections[0]._release).toHaveBeenCalledTimes(1) + expect(seenConnections[0].release).toHaveBeenCalledTimes(1) }) it('should resetAndFlush and then release the connection', async () => { @@ -607,7 +607,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { await connectionProvider.verifyConnectivityAndGetServerInfo() - expect(seenConnections[0]._release.mock.invocationCallOrder[0]) + expect(seenConnections[0].release.mock.invocationCallOrder[0]) .toBeGreaterThan(resetAndFlush.mock.invocationCallOrder[0]) }) @@ -636,7 +636,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { await connectionProvider.verifyConnectivityAndGetServerInfo() } catch (e) { } finally { - expect(seenConnections[0]._release).toHaveBeenCalledTimes(1) + expect(seenConnections[0].release).toHaveBeenCalledTimes(1) } }) @@ -692,7 +692,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { } connection.resetAndFlush = resetAndFlush if (releaseMock) { - connection._release = releaseMock + connection.release = releaseMock } seenConnections.push(connection) return connection @@ -782,7 +782,7 @@ class FakeConnection extends Connection { super(null) this._address = address - this._release = jest.fn(() => release(address, this)) + this.release = jest.fn(() => release(address, this)) this._server = server this._authToken = auth this._closed = false diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index d057f9d6e..823fe040b 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -2834,8 +2834,8 @@ describe.each([ expect(connections.length).toBe(1) expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() - expect(connections[0]._release.mock.invocationCallOrder[0]) + expect(connections[0].release).toHaveBeenCalled() + expect(connections[0].release.mock.invocationCallOrder[0]) .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) }) @@ -2856,7 +2856,7 @@ describe.each([ // extra checks expect(connections.length).toBe(1) - expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0].release).toHaveBeenCalled() }) it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => { @@ -2900,7 +2900,7 @@ describe.each([ expect(connections.length).toBe(1) expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0].release).toHaveBeenCalled() } } }) @@ -2956,8 +2956,8 @@ describe.each([ expect(connections.length).toBe(1) expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() - expect(connections[0]._release.mock.invocationCallOrder[0]) + expect(connections[0].release).toHaveBeenCalled() + expect(connections[0].release.mock.invocationCallOrder[0]) .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) } } @@ -2979,7 +2979,7 @@ describe.each([ expect(connections.length).toBe(1) expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0].release).toHaveBeenCalled() } } }) @@ -3054,7 +3054,7 @@ describe.each([ connection.resetAndFlush = resetAndFlush } if (releaseMock) { - connection._release = releaseMock + connection.release = releaseMock } seenConnectionsPerAddress.get(address).push(connection) return connection @@ -3193,7 +3193,7 @@ describe.each([ const connection = await create({}, server0, release) - const released = connection._release() + const released = connection.release() expect(released).toBe(releaseResult) expect(release).toHaveBeenCalledWith(server0, connection) @@ -3460,7 +3460,7 @@ describe.each([ expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth }, server3) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) @@ -3502,7 +3502,7 @@ describe.each([ expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth }, server1) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) @@ -3546,7 +3546,7 @@ describe.each([ expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth }, server0) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) @@ -3575,7 +3575,7 @@ describe.each([ expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth }, server0) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) }) @@ -3903,7 +3903,7 @@ class FakeConnection extends Connection { this._version = version this._protocolVersion = protocolVersion this.release = release - this._release = jest.fn(() => release(address, this)) + this.release = jest.fn(() => release(address, this)) this.resetAndFlush = jest.fn(() => Promise.resolve()) this._server = server this._authToken = authToken diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index fb89e882e..841488d3c 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -20,10 +20,13 @@ import ChannelConnection from '../../src/connection/connection-channel' import { int, internal, newError } from 'neo4j-driver-core' import { notificationFilterBehaviour } from '../bolt/behaviour' +import { ResultStreamObserver } from '../../src/bolt' const { serverAddress: { ServerAddress }, - logger: { Logger } + logger: { Logger }, + bookmarks: { Bookmarks }, + txConfig: { TxConfig } } = internal describe('ChannelConnection', () => { @@ -715,6 +718,182 @@ describe('ChannelConnection', () => { }) }) + describe('.beginTransaction()', () => { + it('should call redirect the request to the protocol', () => { + const observer = new ResultStreamObserver() + const protocol = { + beginTransaction: jest.fn(() => observer) + } + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const config = { + bookmarks: Bookmarks.empty(), + txConfig: TxConfig.empty(), + database: 'neo4j', + mode: 'READ', + impersonatedUser: 'other cat', + notificationFilter: { + minimumSeverityLevel: 'WARNING' + }, + beforeError: () => console.log('my error'), + afterComplete: (metadata) => console.log('metadata', metadata) + } + + const result = connection.beginTransaction(config) + + expect(result).toBe(observer) + expect(protocol.beginTransaction).toBeCalledWith(config) + }) + }) + + describe('.run()', () => { + it('should call redirect the request to the protocol', () => { + const observer = new ResultStreamObserver() + const protocol = { + run: jest.fn(() => observer) + } + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const query = 'RETURN $x' + const params = { x: 1 } + const config = { + bookmarks: Bookmarks.empty(), + txConfig: TxConfig.empty(), + database: 'neo4j', + mode: 'READ', + impersonatedUser: 'other cat', + notificationFilter: { + minimumSeverityLevel: 'WARNING' + }, + fetchSize: 1000, + highRecordWatermark: 1234, + lowRecordWatermark: 12, + reactive: false, + beforeError: () => console.log('my error'), + afterComplete: (metadata) => console.log('metadata', metadata) + } + + const result = connection.run(query, params, config) + + expect(result).toBe(observer) + expect(protocol.run).toBeCalledWith(query, params, config) + }) + }) + + describe('.commitTransaction()', () => { + it('should call redirect the request to the protocol', () => { + const observer = new ResultStreamObserver() + const protocol = { + commitTransaction: jest.fn(() => observer) + } + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const config = { + beforeError: () => console.log('my error'), + afterComplete: (metadata) => console.log('metadata', metadata) + } + + const result = connection.commitTransaction(config) + + expect(result).toBe(observer) + expect(protocol.commitTransaction).toBeCalledWith(config) + }) + }) + + describe('.rollbackTransaction()', () => { + it('should call redirect the request to the protocol', () => { + const observer = new ResultStreamObserver() + const protocol = { + rollbackTransaction: jest.fn(() => observer) + } + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const config = { + beforeError: () => console.log('my error'), + afterComplete: (metadata) => console.log('metadata', metadata) + } + + const result = connection.rollbackTransaction(config) + + expect(result).toBe(observer) + expect(protocol.rollbackTransaction).toBeCalledWith(config) + }) + }) + + describe('.isOpen()', () => { + it('should return true when is not broken and channel is open', () => { + const connection = spyOnConnectionChannel({ protocolSupplier: () => ({}), channel: { _open: true } }) + + expect(connection.isOpen()).toBe(true) + }) + + it('should return true when is not broken and channel not is open', () => { + const connection = spyOnConnectionChannel({ protocolSupplier: () => ({}), channel: { _open: false } }) + + expect(connection.isOpen()).toBe(false) + }) + + it('should return true when is broken and channel not is open', () => { + const connection = spyOnConnectionChannel({ + protocolSupplier: () => ({ + notifyFatalError: () => {} + }), + channel: { _open: false } + }) + + connection._handleFatalError(new Error('the error which makes the connection be broken.')) + + expect(connection.isOpen()).toBe(false) + }) + + it('should return true when is broken and channel is open', () => { + const connection = spyOnConnectionChannel({ + protocolSupplier: () => ({ + notifyFatalError: () => {} + }), + channel: { _open: true } + }) + + connection._handleFatalError(new Error('the error which makes the connection be broken.')) + + expect(connection.isOpen()).toBe(false) + }) + }) + + describe('.getProtocolVersion()', () => { + it('should call redirect request to the protocol', () => { + const version = 5.3 + const getVersion = jest.fn(() => version) + const protocol = { + get version () { + return getVersion() + } + } + + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const result = connection.getProtocolVersion() + + expect(result).toBe(version) + expect(getVersion).toBeCalledWith() + }) + }) + + describe('.hasOngoingObservableRequests()', () => { + it('should call redirect request to the protocol', () => { + const protocol = { + hasOngoingObservableRequests: jest.fn(() => true) + } + + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const result = connection.hasOngoingObservableRequests() + + expect(result).toBe(true) + expect(protocol.hasOngoingObservableRequests).toBeCalledWith() + }) + }) + function spyOnConnectionChannel ({ channel, errorHandler, diff --git a/packages/neo4j-driver/test/internal/connection-delegate.test.js b/packages/bolt-connection/test/connection/connection-delegate.test.js similarity index 61% rename from packages/neo4j-driver/test/internal/connection-delegate.test.js rename to packages/bolt-connection/test/connection/connection-delegate.test.js index d88508442..bc6334716 100644 --- a/packages/neo4j-driver/test/internal/connection-delegate.test.js +++ b/packages/bolt-connection/test/connection/connection-delegate.test.js @@ -16,20 +16,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import DelegateConnection from '../../../bolt-connection/lib/connection/connection-delegate' -import Connection from '../../../bolt-connection/lib/connection/connection' -import { BoltProtocol } from '../../../bolt-connection/lib/bolt' -import ConnectionErrorHandler from '../../../bolt-connection/lib/connection/connection-error-handler' -import { internal } from 'neo4j-driver-core' + +import Connection from '../../src/connection/connection' +import DelegateConnection from '../../src/connection/connection-delegate' +import { Connection as CoreConnection, internal } from 'neo4j-driver-core' +import ConnectionErrorHandler from '../../src/connection/connection-error-handler' +import { BoltProtocol } from '../../src/bolt' +import utils from '../test-utils' const { serverAddress: { ServerAddress: BoltAddress } } = internal -describe('#unit DelegateConnection', () => { +describe('DelegateConnection', () => { + beforeEach(() => { + expect.extend(utils.matchers) + }) + + const NON_DELEGATE_METHODS = [ + 'constructor', + // the delegate replaces the error handler of the original connection + // and not delegate the requests to the previous connection until released + 'handleAndTransformError' + ] + it('should delegate get id', () => { const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'id', 'get').and.returnValue(5) + const spy = jest.spyOn(delegate, 'id', 'get').mockReturnValue(5) const connection = new DelegateConnection(delegate, null) expect(connection.id).toBe(5) @@ -38,7 +51,7 @@ describe('#unit DelegateConnection', () => { it('should delegate get databaseId', () => { const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'databaseId', 'get').and.returnValue( + const spy = jest.spyOn(delegate, 'databaseId', 'get').mockReturnValue( '123-456' ) const connection = new DelegateConnection(delegate, null) @@ -49,7 +62,7 @@ describe('#unit DelegateConnection', () => { it('should delegate set databaseId', () => { const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'databaseId', 'set') + const spy = jest.spyOn(delegate, 'databaseId', 'set').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.databaseId = '345-678' @@ -63,7 +76,7 @@ describe('#unit DelegateConnection', () => { version: 'Neo4j/3.5.6' } const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'server', 'get').and.returnValue(server) + const spy = jest.spyOn(delegate, 'server', 'get').mockReturnValue(server) const connection = new DelegateConnection(delegate, null) expect(connection.server).toBe(server) @@ -73,7 +86,7 @@ describe('#unit DelegateConnection', () => { it('should delegate get address', () => { const address = BoltAddress.fromUrl('bolt://127.0.0.1:8080') const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'address', 'get').and.returnValue( + const spy = jest.spyOn(delegate, 'address', 'get').mockReturnValue( address ) const connection = new DelegateConnection(delegate, null) @@ -85,7 +98,7 @@ describe('#unit DelegateConnection', () => { it('should delegate get version', () => { const version = 'Neo4j/3.5.6' const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'version', 'get').and.returnValue( + const spy = jest.spyOn(delegate, 'version', 'get').mockReturnValue( version ) const connection = new DelegateConnection(delegate, null) @@ -96,7 +109,7 @@ describe('#unit DelegateConnection', () => { it('should delegate set version', () => { const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'version', 'set') + const spy = jest.spyOn(delegate, 'version', 'set').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.version = 'Neo4j/3.4.9' @@ -106,7 +119,7 @@ describe('#unit DelegateConnection', () => { it('should delegate isOpen', () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'isOpen').and.returnValue(true) + const spy = jest.spyOn(delegate, 'isOpen').mockReturnValue(true) const connection = new DelegateConnection(delegate, null) expect(connection.isOpen()).toBeTruthy() @@ -116,7 +129,7 @@ describe('#unit DelegateConnection', () => { it('should delegate protocol', () => { const protocol = new BoltProtocol() const delegate = new Connection(null) - const spy = spyOn(delegate, 'protocol').and.returnValue(protocol) + const spy = jest.spyOn(delegate, 'protocol').mockReturnValue(protocol) const connection = new DelegateConnection(delegate, null) expect(connection.protocol()).toBe(protocol) @@ -125,7 +138,7 @@ describe('#unit DelegateConnection', () => { it('should delegate connect', () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'connect') + const spy = jest.spyOn(delegate, 'connect').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.connect('neo4j/js-driver', 'mydriver/0.0.0 some system info', {}) @@ -135,7 +148,7 @@ describe('#unit DelegateConnection', () => { it('should delegate write', () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'write') + const spy = jest.spyOn(delegate, 'write').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.write({}, null, true) @@ -145,7 +158,7 @@ describe('#unit DelegateConnection', () => { it('should delegate resetAndFlush', () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'resetAndFlush') + const spy = jest.spyOn(delegate, 'resetAndFlush').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.resetAndFlush() @@ -155,7 +168,7 @@ describe('#unit DelegateConnection', () => { it('should delegate close', async () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'close').and.returnValue(Promise.resolve()) + const spy = jest.spyOn(delegate, 'close').mockReturnValue(Promise.resolve()) const connection = new DelegateConnection(delegate, null) await connection.close() @@ -163,13 +176,13 @@ describe('#unit DelegateConnection', () => { expect(spy).toHaveBeenCalledTimes(1) }) - it('should delegate _release', () => { + it('should delegate release', () => { const delegate = new Connection(null) - delegate._release = () => {} - const spy = spyOn(delegate, '_release') + delegate.release = () => {} + const spy = jest.spyOn(delegate, 'release').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) - connection._release() + connection.release() expect(spy).toHaveBeenCalledTimes(1) }) @@ -177,7 +190,7 @@ describe('#unit DelegateConnection', () => { it('should override errorHandler on create and restore on release', () => { const errorHandlerOriginal = new ConnectionErrorHandler('code1') const delegate = new Connection(errorHandlerOriginal) - delegate._release = () => {} + delegate.release = () => {} expect(delegate._errorHandler).toBe(errorHandlerOriginal) @@ -186,8 +199,31 @@ describe('#unit DelegateConnection', () => { expect(delegate._errorHandler).toBe(errorHandlerNew) - connection._release() + connection.release() expect(delegate._errorHandler).toBe(errorHandlerOriginal) }) + + it.each(getDelegatedMethods())('should delegate %s calls with exact args number and return value', (delegatedMethod) => { + const result = 'the result' + const method = CoreConnection.prototype[delegatedMethod] || Connection.prototype[delegatedMethod] + const argsNumber = method.length // function.length returns the number of arguments expected by the function + const args = [...Array(argsNumber).keys()] + + const connection = { + [delegatedMethod]: jest.fn(() => result) + } + + const delegatedConnection = new DelegateConnection(connection) + + expect(delegatedConnection[delegatedMethod](...args)).toBe(result) + expect(connection[delegatedMethod]).toHaveBeenCalledTimes(1) + expect(connection[delegatedMethod]).toBeCalledWith(...args) + expect(connection[delegatedMethod]).toBeCalledWithThis(connection) + }) + + function getDelegatedMethods () { + const allMethods = new Set([...Object.keys(Connection.prototype), ...Object.keys(CoreConnection.prototype)]) + return [...allMethods].filter(method => !NON_DELEGATE_METHODS.includes(method)) + } }) diff --git a/packages/bolt-connection/test/test-utils.js b/packages/bolt-connection/test/test-utils.js index 9eb7e31fd..b4ac053f8 100644 --- a/packages/bolt-connection/test/test-utils.js +++ b/packages/bolt-connection/test/test-utils.js @@ -53,6 +53,7 @@ const matchers = { } else { result.message = `Expected '${actual}' to be an element of '[${expected}]', but it wasn't` } + return result }, toBeMessage: function (actual, expected) { if (expected === undefined) { @@ -84,6 +85,12 @@ const matchers = { result.message = () => `Expected message '[${failures}]', but it didn't` } return result + }, + toBeCalledWithThis: function (theMockedFunction, thisArg) { + return { + pass: theMockedFunction.mock.contexts.filter(ctx => ctx === thisArg).length > 0, + message: () => `Expected to be called with this equals to '${json.stringify(thisArg)}', but it wasn't.` + } } } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js index 631889fcf..3dfa34f79 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js @@ -82,7 +82,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { */ _createConnection ({ auth }, address, release) { return this._createChannelConnection(address).then(connection => { - connection._release = () => { + connection.release = () => { return release(address, connection) } this._openConnections[connection.id] = connection @@ -160,7 +160,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { await connection.resetAndFlush() } } finally { - await connection._release() + await connection.release() } return serverInfo } @@ -191,7 +191,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { } throw error } finally { - await Promise.all(connectionsToRelease.map(conn => conn._release())) + await Promise.all(connectionsToRelease.map(conn => conn.release())) } } @@ -201,7 +201,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { connection._sticky = connectionWithSameCredentials && !connection.supportsReAuth if (shouldCreateStickyConnection || connection._sticky) { - await connection._release() + await connection.release() throw newError('Driver is connected to a database that does not support user switch.') } } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js index 4a43a612d..48ba86127 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js @@ -156,6 +156,26 @@ export default class ChannelConnection extends Connection { } } + beginTransaction (config) { + return this._protocol.beginTransaction(config) + } + + run (query, parameters, config) { + return this._protocol.run(query, parameters, config) + } + + commitTransaction (config) { + return this._protocol.commitTransaction(config) + } + + rollbackTransaction (config) { + return this._protocol.rollbackTransaction(config) + } + + getProtocolVersion () { + return this._protocol.version + } + get authToken () { return this._authToken } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-delegate.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-delegate.js index a35307e69..896a83d42 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-delegate.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-delegate.js @@ -35,6 +35,26 @@ export default class DelegateConnection extends Connection { this._delegate = delegate } + beginTransaction (config) { + return this._delegate.beginTransaction(config) + } + + run (query, param, config) { + return this._delegate.run(query, param, config) + } + + commitTransaction (config) { + return this._delegate.commitTransaction(config) + } + + rollbackTransaction (config) { + return this._delegate.rollbackTransaction(config) + } + + getProtocolVersion () { + return this._delegate.getProtocolVersion() + } + get id () { return this._delegate.id } @@ -103,11 +123,11 @@ export default class DelegateConnection extends Connection { return this._delegate.close() } - _release () { + release () { if (this._originalErrorHandler) { this._delegate._errorHandler = this._originalErrorHandler } - return this._delegate._release() + return this._delegate.release() } } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js index d80357d09..d31ccb834 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js @@ -18,12 +18,14 @@ */ // eslint-disable-next-line no-unused-vars import { ResultStreamObserver, BoltProtocol } from '../bolt/index.js' +import { Connection as CoreConnection } from '../../core/index.ts' -export default class Connection { +export default class Connection extends CoreConnection { /** * @param {ConnectionErrorHandler} errorHandler the error handler */ constructor (errorHandler) { + super() this._errorHandler = errorHandler } diff --git a/packages/neo4j-driver-deno/lib/core/connection-provider.ts b/packages/neo4j-driver-deno/lib/core/connection-provider.ts index 71268dd2a..8ec5d162d 100644 --- a/packages/neo4j-driver-deno/lib/core/connection-provider.ts +++ b/packages/neo4j-driver-deno/lib/core/connection-provider.ts @@ -25,12 +25,12 @@ import { AuthToken } from './types.ts' /** * Interface define a releasable resource shape - * + * * @private * @interface */ class Releasable { - release(): Promise { + release (): Promise { throw new Error('Not implemented') } } diff --git a/packages/neo4j-driver-deno/lib/core/connection.ts b/packages/neo4j-driver-deno/lib/core/connection.ts index 94a67ebfc..cf9cb9eca 100644 --- a/packages/neo4j-driver-deno/lib/core/connection.ts +++ b/packages/neo4j-driver-deno/lib/core/connection.ts @@ -24,18 +24,17 @@ import { ResultStreamObserver } from './internal/observers.ts' import { TxConfig } from './internal/tx-config.ts' import NotificationFilter from './notification-filter.ts' - interface HasBeforeErrorAndAfterComplete { - beforeError?: (error: Error) => void + beforeError?: (error: Error) => void afterComplete?: (metadata: unknown) => void } interface BeginTransactionConfig extends HasBeforeErrorAndAfterComplete { - bookmarks: Bookmarks, - txConfig: TxConfig, + bookmarks: Bookmarks + txConfig: TxConfig mode?: AccessMode database?: string - impersonatedUser?: string + impersonatedUser?: string notificationFilter?: NotificationFilter } @@ -54,51 +53,49 @@ interface RunQueryConfig extends BeginTransactionConfig { reactive: boolean } - /** * Interface which defines a connection for the core driver object. - * - * - * This connection exposes only methods used by the code module. + * + * + * This connection exposes only methods used by the code module. * Methods with connection implementation details can be defined and used * by the implementation layer. - * + * * @private * @interface */ class Connection { - beginTransaction(config: BeginTransactionConfig): ResultStreamObserver { + beginTransaction (config: BeginTransactionConfig): ResultStreamObserver { throw new Error('Not implemented') } - run(query: string, parameters?: Record, config?: RunQueryConfig): ResultStreamObserver { + run (query: string, parameters?: Record, config?: RunQueryConfig): ResultStreamObserver { throw new Error('Not implemented') } - commitTransaction(config: CommitTransactionConfig): ResultStreamObserver { + commitTransaction (config: CommitTransactionConfig): ResultStreamObserver { throw new Error('Not implemented') } - rollbackTransaction(config: RollbackConnectionConfig): ResultStreamObserver { + rollbackTransaction (config: RollbackConnectionConfig): ResultStreamObserver { throw new Error('Not implemented') } - resetAndFlush(): Promise { + resetAndFlush (): Promise { throw new Error('Not implemented') } - isOpen(): boolean { + isOpen (): boolean { throw new Error('Not implemented') } - getProtocolVersion(): number { + getProtocolVersion (): number { throw new Error('Not implemented') } - - hasOngoingObservableRequests(): boolean { + + hasOngoingObservableRequests (): boolean { throw new Error('Not implemented') } - } export default Connection From ae5043a8460b561872fc86d51b66c481da1a380d Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 7 Sep 2023 17:03:00 +0200 Subject: [PATCH 4/9] Small fix in the fakeconn --- packages/bolt-connection/test/fake-connection.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/bolt-connection/test/fake-connection.js b/packages/bolt-connection/test/fake-connection.js index af5cfc870..a9061bb76 100644 --- a/packages/bolt-connection/test/fake-connection.js +++ b/packages/bolt-connection/test/fake-connection.js @@ -95,7 +95,7 @@ export default class FakeConnection extends Connection { return Promise.resolve() } - _release () { + release () { this.releaseInvoked++ return Promise.resolve() } From efb1077ea3d736e0cc692aa8041356d7a29da01e Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 7 Sep 2023 17:37:27 +0200 Subject: [PATCH 5/9] remove duplicated methods --- .../src/connection/connection.js | 19 ------------------- .../bolt-connection/connection/connection.js | 19 ------------------- 2 files changed, 38 deletions(-) diff --git a/packages/bolt-connection/src/connection/connection.js b/packages/bolt-connection/src/connection/connection.js index 95798bee4..61c7131c6 100644 --- a/packages/bolt-connection/src/connection/connection.js +++ b/packages/bolt-connection/src/connection/connection.js @@ -53,13 +53,6 @@ export default class Connection extends CoreConnection { throw new Error('not implemented') } - /** - * @returns {boolean} whether this connection is in a working condition - */ - isOpen () { - throw new Error('not implemented') - } - /** * @returns {BoltProtocol} the underlying bolt protocol assigned to this connection */ @@ -111,18 +104,6 @@ export default class Connection extends CoreConnection { throw new Error('not implemented') } - /** - * Send a RESET-message to the database. Message is immediately flushed to the network. - * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. - */ - resetAndFlush () { - throw new Error('not implemented') - } - - hasOngoingObservableRequests () { - throw new Error('not implemented') - } - /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the connection is closed. diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js index d31ccb834..838b1e381 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js @@ -53,13 +53,6 @@ export default class Connection extends CoreConnection { throw new Error('not implemented') } - /** - * @returns {boolean} whether this connection is in a working condition - */ - isOpen () { - throw new Error('not implemented') - } - /** * @returns {BoltProtocol} the underlying bolt protocol assigned to this connection */ @@ -111,18 +104,6 @@ export default class Connection extends CoreConnection { throw new Error('not implemented') } - /** - * Send a RESET-message to the database. Message is immediately flushed to the network. - * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. - */ - resetAndFlush () { - throw new Error('not implemented') - } - - hasOngoingObservableRequests () { - throw new Error('not implemented') - } - /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the connection is closed. From 5eb272e9fea0e24649d6516bae16941273477e36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Fri, 8 Sep 2023 16:14:52 +0200 Subject: [PATCH 6/9] Apply suggestions from code review Co-authored-by: Robsdedude --- packages/core/test/result.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 22b7cb479..ceb377ca4 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -467,9 +467,9 @@ describe('Result', () => { 'should enrich summary with the protocol version onCompleted', async version => { const connectionMock = new FakeConnection() - // converting to accept number as undefined + // converting to accept undefined as number // this test is considering the situation where protocol version - // is undefined, which it should not happen in normal driver + // is undefined, which should not happen during normal driver // operation. connectionMock.protocolVersion = version as unknown as number @@ -681,9 +681,9 @@ describe('Result', () => { 'should enrich summary with the protocol version on completed', async version => { const connectionMock = new FakeConnection() - // converting to accept number as undefined + // converting to accept undefined as number // this test is considering the situation where protocol version - // is undefined, which it should not happen in normal driver + // is undefined, which should not happen during normal driver // operation. connectionMock.protocolVersion = version as unknown as number From 5c684741a66a4ca9ba18ab21f517decda4b2654b Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 12 Sep 2023 14:17:21 +0200 Subject: [PATCH 7/9] Fix pool._release error handling and log release error in connection holder --- packages/bolt-connection/src/pool/pool.js | 81 +++++----- .../bolt-connection/test/pool/pool.test.js | 142 +++++++++++++++++- .../core/src/internal/connection-holder.ts | 41 ++++- packages/core/src/session.ts | 8 +- packages/core/test/result.test.ts | 7 +- packages/core/test/transaction.test.ts | 5 +- .../lib/bolt-connection/pool/pool.js | 81 +++++----- .../lib/core/internal/connection-holder.ts | 41 ++++- .../neo4j-driver-deno/lib/core/session.ts | 8 +- .../connection-holder-readonly.test.js | 2 +- .../test/internal/connection-holder.test.js | 79 +++++++++- 11 files changed, 387 insertions(+), 108 deletions(-) diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index 187393f5f..105d4d137 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -273,53 +273,56 @@ class Pool { async _release (address, resource, pool) { const key = address.asKey() - if (pool.isActive()) { - // there exist idle connections for the given key - if (!await this._validateOnRelease(resource)) { + try { + if (pool.isActive()) { + // there exist idle connections for the given key + if (!await this._validateOnRelease(resource)) { + if (this._log.isDebugEnabled()) { + this._log.debug( + `${resource} destroyed and can't be released to the pool ${key} because it is not functional` + ) + } + pool.removeInUse(resource) + await this._destroy(resource) + } else { + if (this._installIdleObserver) { + this._installIdleObserver(resource, { + onError: error => { + this._log.debug( + `Idle connection ${resource} destroyed because of error: ${error}` + ) + const pool = this._pools[key] + if (pool) { + this._pools[key] = pool.filter(r => r !== resource) + pool.removeInUse(resource) + } + // let's not care about background clean-ups due to errors but just trigger the destroy + // process for the resource, we especially catch any errors and ignore them to avoid + // unhandled promise rejection warnings + this._destroy(resource).catch(() => {}) + } + }) + } + pool.push(resource) + if (this._log.isDebugEnabled()) { + this._log.debug(`${resource} released to the pool ${key}`) + } + } + } else { + // key has been purged, don't put it back, just destroy the resource if (this._log.isDebugEnabled()) { this._log.debug( - `${resource} destroyed and can't be released to the pool ${key} because it is not functional` + `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` ) } pool.removeInUse(resource) await this._destroy(resource) - } else { - if (this._installIdleObserver) { - this._installIdleObserver(resource, { - onError: error => { - this._log.debug( - `Idle connection ${resource} destroyed because of error: ${error}` - ) - const pool = this._pools[key] - if (pool) { - this._pools[key] = pool.filter(r => r !== resource) - pool.removeInUse(resource) - } - // let's not care about background clean-ups due to errors but just trigger the destroy - // process for the resource, we especially catch any errors and ignore them to avoid - // unhandled promise rejection warnings - this._destroy(resource).catch(() => {}) - } - }) - } - pool.push(resource) - if (this._log.isDebugEnabled()) { - this._log.debug(`${resource} released to the pool ${key}`) - } } - } else { - // key has been purged, don't put it back, just destroy the resource - if (this._log.isDebugEnabled()) { - this._log.debug( - `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` - ) - } - pool.removeInUse(resource) - await this._destroy(resource) - } - resourceReleased(key, this._activeResourceCounts) + } finally { + resourceReleased(key, this._activeResourceCounts) - this._processPendingAcquireRequests(address) + this._processPendingAcquireRequests(address) + } } async _purgeKey (key) { diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js index 668f2d5e8..32c7b9c72 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -125,6 +125,146 @@ describe('#unit Pool', () => { expect(destroyed[1].id).toBe(r1.id) }) + it('should release resources and process acquisitions when destroy connection', async () => { + // Given a pool that allocates + let counter = 0 + const destroyed = [] + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (_acquisitionContext, server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + destroyed.push(res) + return Promise.resolve() + }, + validateOnRelease: res => false, + config: new PoolConfig(2, 10000) + }) + + // When + const r0 = await pool.acquire({}, address) + const r1 = await pool.acquire({}, address) + const promiseOfR2Status = { state: 'pending' } + const promiseOfR2 = pool.acquire({}, address) + .then(r2 => { + promiseOfR2Status.state = 'resolved' + return r2 + }).catch((e) => { + promiseOfR2Status.state = 'rejected' + throw e + }) + + expect(promiseOfR2Status.state).toEqual('pending') + + await r0.close() + await r1.close() + + // Then + const r2 = await promiseOfR2 + + await r2.close() + + expect(destroyed.length).toBe(3) + expect(destroyed[0].id).toBe(r0.id) + expect(destroyed[1].id).toBe(r1.id) + expect(destroyed[2].id).toBe(r2.id) + }) + + it('should release resources and process acquisitions when destroy connection fails', async () => { + // Given a pool that allocates + let counter = 0 + const theMadeUpError = new Error('I made this error for testing') + const destroyed = [] + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (_acquisitionContext, server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + destroyed.push(res) + return Promise.reject(theMadeUpError) + }, + validateOnRelease: res => false, + config: new PoolConfig(2, 3000) + }) + + // When + const r0 = await pool.acquire({}, address) + const r1 = await pool.acquire({}, address) + const promiseOfR2Status = { state: 'pending' } + const promiseOfR2 = pool.acquire({}, address) + .then(r2 => { + promiseOfR2Status.state = 'resolved' + return r2 + }).catch((e) => { + promiseOfR2Status.state = 'rejected' + throw e + }) + + expect(promiseOfR2Status.state).toEqual('pending') + + await expect(r0.close()).rejects.toThrow(theMadeUpError) + await expect(r1.close()).rejects.toThrow(theMadeUpError) + + // Then + const r2 = await promiseOfR2 + + await expect(r2.close()).rejects.toThrow(theMadeUpError) + + expect(destroyed.length).toBe(3) + expect(destroyed[0].id).toBe(r0.id) + expect(destroyed[1].id).toBe(r1.id) + expect(destroyed[2].id).toBe(r2.id) + }) + + it('should release resources and process acquisitions when destroy connection fails in closed pools', async () => { + // Given a pool that allocates + let counter = 0 + const theMadeUpError = new Error('I made this error for testing') + const destroyed = [] + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (_acquisitionContext, server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + destroyed.push(res) + return Promise.reject(theMadeUpError) + }, + validateOnRelease: res => true, + config: new PoolConfig(2, 3000) + }) + + // When + const r0 = await pool.acquire({}, address) + const r1 = await pool.acquire({}, address) + + const promiseOfR2Status = { state: 'pending' } + const promiseOfR2 = pool.acquire({}, address) + .then(r2 => { + promiseOfR2Status.state = 'resolved' + return r2 + }).catch((e) => { + promiseOfR2Status.state = 'rejected' + throw e + }) + + await pool.purge(address) + + expect(promiseOfR2Status.state).toEqual('pending') + + await expect(r0.close()).rejects.toThrow(theMadeUpError) + await expect(r1.close()).rejects.toThrow(theMadeUpError) + + // Then + const r2 = await promiseOfR2 + + // Don't fail since the pool will be open again + await r2.close() + + expect(destroyed.length).toBe(2) + expect(destroyed[0].id).toBe(r0.id) + expect(destroyed[1].id).toBe(r1.id) + }) + it('frees if validateOnRelease returns Promise.resolve(false)', async () => { // Given a pool that allocates let counter = 0 @@ -1309,6 +1449,6 @@ class Resource { } close () { - this.release(this.key, this) + return this.release(this.key, this) } } diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index cb854c679..c21299c9a 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -25,6 +25,7 @@ import { ACCESS_MODE_WRITE, AccessMode } from './constants' import { Bookmarks } from './bookmarks' import ConnectionProvider, { Releasable } from '../connection-provider' import { AuthToken } from '../types' +import { Logger } from './logger' /** * @private @@ -87,6 +88,7 @@ class ConnectionHolder implements ConnectionHolderInterface { private readonly _getConnectionAcquistionBookmarks: () => Promise private readonly _onDatabaseNameResolved?: (databaseName?: string) => void private readonly _auth?: AuthToken + private readonly _log: Logger private _closed: boolean /** @@ -102,14 +104,15 @@ class ConnectionHolder implements ConnectionHolderInterface { * @property {AuthToken} params.auth - the target auth for the to-be-acquired connection */ constructor ({ - mode = ACCESS_MODE_WRITE, + mode, database = '', bookmarks, connectionProvider, impersonatedUser, onDatabaseNameResolved, getConnectionAcquistionBookmarks, - auth + auth, + log }: { mode?: AccessMode database?: string @@ -119,8 +122,9 @@ class ConnectionHolder implements ConnectionHolderInterface { onDatabaseNameResolved?: (databaseName?: string) => void getConnectionAcquistionBookmarks?: () => Promise auth?: AuthToken - } = {}) { - this._mode = mode + log: Logger + }) { + this._mode = mode ?? ACCESS_MODE_WRITE this._closed = false this._database = database != null ? assertString(database, 'database') : '' this._bookmarks = bookmarks ?? Bookmarks.empty() @@ -130,6 +134,8 @@ class ConnectionHolder implements ConnectionHolderInterface { this._connectionPromise = Promise.resolve(null) this._onDatabaseNameResolved = onDatabaseNameResolved this._auth = auth + this._log = log + this._logError = this._logError.bind(this) this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty())) } @@ -209,6 +215,10 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._releaseConnection(hasTx) } + log (): Logger { + return this._log + } + /** * Return the current pooled connection instance to the connection pool. * We don't pool Session instances, to avoid users using the Session after they've called close. @@ -231,10 +241,19 @@ class ConnectionHolder implements ConnectionHolderInterface { return Promise.resolve(null) } }) - .catch(ignoreError) + .catch(this._logError) return this._connectionPromise } + + _logError (error: Error): null { + if (this._log.isWarnEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this._log.warn(`ConnectionHolder got an error while releasing the connection. Error ${error}. Stacktrace: ${error.stack}`) + } + + return null + } } /** @@ -245,7 +264,7 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { private readonly _connectionHolder: ConnectionHolder /** - * Contructor + * Constructor * @param {ConnectionHolder} connectionHolder the connection holder which will treat the requests */ constructor (connectionHolder: ConnectionHolder) { @@ -255,7 +274,8 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { bookmarks: connectionHolder.bookmarks(), // @ts-expect-error getConnectionAcquistionBookmarks: connectionHolder._getConnectionAcquistionBookmarks, - connectionProvider: connectionHolder.connectionProvider() + connectionProvider: connectionHolder.connectionProvider(), + log: connectionHolder.log() }) this._connectionHolder = connectionHolder } @@ -298,6 +318,13 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { } class EmptyConnectionHolder extends ConnectionHolder { + constructor () { + super({ + // Empty logger + log: Logger.create({}) + }) + } + mode (): undefined { return undefined } diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 4e28dc36c..46de97335 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -75,7 +75,7 @@ class Session { private readonly _results: Result[] private readonly _bookmarkManager?: BookmarkManager private readonly _notificationFilter?: NotificationFilter - private readonly _log?: Logger + private readonly _log: Logger /** * @constructor * @protected @@ -132,7 +132,8 @@ class Session { connectionProvider, impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, + log }) this._writeConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_WRITE, @@ -142,7 +143,8 @@ class Session { connectionProvider, impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, + log }) this._open = true this._hasTx = false diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index ceb377ca4..083b03641 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -27,6 +27,7 @@ import { import ResultStreamObserverMock from './utils/result-stream-observer.mock' import Result from '../src/result' import FakeConnection from './utils/connection.fake' +import { Logger } from '../src/internal/logger' interface AB { a: number @@ -154,7 +155,7 @@ describe('Result', () => { ])('when query=%s and parameters=%s', (query, params, expected) => { let connectionHolderMock: connectionHolder.ConnectionHolder beforeEach(() => { - connectionHolderMock = new connectionHolder.ConnectionHolder({}) + connectionHolderMock = new connectionHolder.ConnectionHolder({ log: Logger.create({}) }) result = new Result( Promise.resolve(streamObserverMock), query, @@ -406,7 +407,7 @@ describe('Result', () => { let connectionHolderMock: connectionHolder.ConnectionHolder beforeEach(() => { - connectionHolderMock = new connectionHolder.ConnectionHolder({}) + connectionHolderMock = new connectionHolder.ConnectionHolder({ log: Logger.create({}) }) result = new Result( Promise.resolve(streamObserverMock), 'query', @@ -632,7 +633,7 @@ describe('Result', () => { let connectionHolderMock: connectionHolder.ConnectionHolder beforeEach(() => { - connectionHolderMock = new connectionHolder.ConnectionHolder({}) + connectionHolderMock = new connectionHolder.ConnectionHolder({ log: Logger.create({}) }) result = new Result( Promise.resolve(streamObserverMock), 'query', diff --git a/packages/core/test/transaction.test.ts b/packages/core/test/transaction.test.ts index 92a59f24f..2cbeb0115 100644 --- a/packages/core/test/transaction.test.ts +++ b/packages/core/test/transaction.test.ts @@ -21,6 +21,7 @@ import { ConnectionProvider, newError, NotificationFilter, Transaction, Transact import { BeginTransactionConfig } from '../src/connection' import { Bookmarks } from '../src/internal/bookmarks' import { ConnectionHolder } from '../src/internal/connection-holder' +import { Logger } from '../src/internal/logger' import { TxConfig } from '../src/internal/tx-config' import FakeConnection from './utils/connection.fake' import { validNotificationFilters } from './utils/notification-filters.fixtures' @@ -511,7 +512,7 @@ function newTransactionPromise ({ } connectionProvider.close = async () => await Promise.resolve() - const connectionHolder = new ConnectionHolder({ connectionProvider }) + const connectionHolder = new ConnectionHolder({ connectionProvider, log: Logger.create({}) }) connectionHolder.initializeConnection() const transaction = new TransactionPromise({ @@ -547,7 +548,7 @@ function newRegularTransaction ({ connectionProvider.acquireConnection = async () => await Promise.resolve(connection) connectionProvider.close = async () => await Promise.resolve() - const connectionHolder = new ConnectionHolder({ connectionProvider }) + const connectionHolder = new ConnectionHolder({ connectionProvider, log: Logger.create({}) }) connectionHolder.initializeConnection() const transaction = new Transaction({ diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js b/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js index c1789564c..2a8a0905e 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js @@ -273,53 +273,56 @@ class Pool { async _release (address, resource, pool) { const key = address.asKey() - if (pool.isActive()) { - // there exist idle connections for the given key - if (!await this._validateOnRelease(resource)) { + try { + if (pool.isActive()) { + // there exist idle connections for the given key + if (!await this._validateOnRelease(resource)) { + if (this._log.isDebugEnabled()) { + this._log.debug( + `${resource} destroyed and can't be released to the pool ${key} because it is not functional` + ) + } + pool.removeInUse(resource) + await this._destroy(resource) + } else { + if (this._installIdleObserver) { + this._installIdleObserver(resource, { + onError: error => { + this._log.debug( + `Idle connection ${resource} destroyed because of error: ${error}` + ) + const pool = this._pools[key] + if (pool) { + this._pools[key] = pool.filter(r => r !== resource) + pool.removeInUse(resource) + } + // let's not care about background clean-ups due to errors but just trigger the destroy + // process for the resource, we especially catch any errors and ignore them to avoid + // unhandled promise rejection warnings + this._destroy(resource).catch(() => {}) + } + }) + } + pool.push(resource) + if (this._log.isDebugEnabled()) { + this._log.debug(`${resource} released to the pool ${key}`) + } + } + } else { + // key has been purged, don't put it back, just destroy the resource if (this._log.isDebugEnabled()) { this._log.debug( - `${resource} destroyed and can't be released to the pool ${key} because it is not functional` + `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` ) } pool.removeInUse(resource) await this._destroy(resource) - } else { - if (this._installIdleObserver) { - this._installIdleObserver(resource, { - onError: error => { - this._log.debug( - `Idle connection ${resource} destroyed because of error: ${error}` - ) - const pool = this._pools[key] - if (pool) { - this._pools[key] = pool.filter(r => r !== resource) - pool.removeInUse(resource) - } - // let's not care about background clean-ups due to errors but just trigger the destroy - // process for the resource, we especially catch any errors and ignore them to avoid - // unhandled promise rejection warnings - this._destroy(resource).catch(() => {}) - } - }) - } - pool.push(resource) - if (this._log.isDebugEnabled()) { - this._log.debug(`${resource} released to the pool ${key}`) - } } - } else { - // key has been purged, don't put it back, just destroy the resource - if (this._log.isDebugEnabled()) { - this._log.debug( - `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` - ) - } - pool.removeInUse(resource) - await this._destroy(resource) - } - resourceReleased(key, this._activeResourceCounts) + } finally { + resourceReleased(key, this._activeResourceCounts) - this._processPendingAcquireRequests(address) + this._processPendingAcquireRequests(address) + } } async _purgeKey (key) { diff --git a/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts b/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts index 5f13ba57c..db93a0494 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts @@ -25,6 +25,7 @@ import { ACCESS_MODE_WRITE, AccessMode } from './constants.ts' import { Bookmarks } from './bookmarks.ts' import ConnectionProvider, { Releasable } from '../connection-provider.ts' import { AuthToken } from '../types.ts' +import { Logger } from './logger.ts' /** * @private @@ -87,6 +88,7 @@ class ConnectionHolder implements ConnectionHolderInterface { private readonly _getConnectionAcquistionBookmarks: () => Promise private readonly _onDatabaseNameResolved?: (databaseName?: string) => void private readonly _auth?: AuthToken + private readonly _log: Logger private _closed: boolean /** @@ -102,14 +104,15 @@ class ConnectionHolder implements ConnectionHolderInterface { * @property {AuthToken} params.auth - the target auth for the to-be-acquired connection */ constructor ({ - mode = ACCESS_MODE_WRITE, + mode, database = '', bookmarks, connectionProvider, impersonatedUser, onDatabaseNameResolved, getConnectionAcquistionBookmarks, - auth + auth, + log }: { mode?: AccessMode database?: string @@ -119,8 +122,9 @@ class ConnectionHolder implements ConnectionHolderInterface { onDatabaseNameResolved?: (databaseName?: string) => void getConnectionAcquistionBookmarks?: () => Promise auth?: AuthToken - } = {}) { - this._mode = mode + log: Logger + }) { + this._mode = mode ?? ACCESS_MODE_WRITE this._closed = false this._database = database != null ? assertString(database, 'database') : '' this._bookmarks = bookmarks ?? Bookmarks.empty() @@ -130,6 +134,8 @@ class ConnectionHolder implements ConnectionHolderInterface { this._connectionPromise = Promise.resolve(null) this._onDatabaseNameResolved = onDatabaseNameResolved this._auth = auth + this._log = log + this._logError = this._logError.bind(this) this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty())) } @@ -209,6 +215,10 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._releaseConnection(hasTx) } + log (): Logger { + return this._log + } + /** * Return the current pooled connection instance to the connection pool. * We don't pool Session instances, to avoid users using the Session after they've called close. @@ -231,10 +241,19 @@ class ConnectionHolder implements ConnectionHolderInterface { return Promise.resolve(null) } }) - .catch(ignoreError) + .catch(this._logError) return this._connectionPromise } + + _logError (error: Error): null { + if (this._log.isWarnEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this._log.warn(`ConnectionHolder got an error while releasing the connection. Error ${error}. Stacktrace: ${error.stack}`) + } + + return null + } } /** @@ -245,7 +264,7 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { private readonly _connectionHolder: ConnectionHolder /** - * Contructor + * Constructor * @param {ConnectionHolder} connectionHolder the connection holder which will treat the requests */ constructor (connectionHolder: ConnectionHolder) { @@ -255,7 +274,8 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { bookmarks: connectionHolder.bookmarks(), // @ts-expect-error getConnectionAcquistionBookmarks: connectionHolder._getConnectionAcquistionBookmarks, - connectionProvider: connectionHolder.connectionProvider() + connectionProvider: connectionHolder.connectionProvider(), + log: connectionHolder.log() }) this._connectionHolder = connectionHolder } @@ -298,6 +318,13 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { } class EmptyConnectionHolder extends ConnectionHolder { + constructor () { + super({ + // Empty logger + log: Logger.create({}) + }) + } + mode (): undefined { return undefined } diff --git a/packages/neo4j-driver-deno/lib/core/session.ts b/packages/neo4j-driver-deno/lib/core/session.ts index 906c6b374..db8678fe6 100644 --- a/packages/neo4j-driver-deno/lib/core/session.ts +++ b/packages/neo4j-driver-deno/lib/core/session.ts @@ -75,7 +75,7 @@ class Session { private readonly _results: Result[] private readonly _bookmarkManager?: BookmarkManager private readonly _notificationFilter?: NotificationFilter - private readonly _log?: Logger + private readonly _log: Logger /** * @constructor * @protected @@ -132,7 +132,8 @@ class Session { connectionProvider, impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, + log }) this._writeConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_WRITE, @@ -142,7 +143,8 @@ class Session { connectionProvider, impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, + log }) this._open = true this._hasTx = false diff --git a/packages/neo4j-driver/test/internal/connection-holder-readonly.test.js b/packages/neo4j-driver/test/internal/connection-holder-readonly.test.js index 9484831ec..ffd25d088 100644 --- a/packages/neo4j-driver/test/internal/connection-holder-readonly.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder-readonly.test.js @@ -299,7 +299,7 @@ describe('#unit ReadOnlyConnectionHolder wrapping ConnectionHolder', () => { }) function newConnectionHolder (params, connectionHolderInit = () => {}) { - const connectionHolder = new ConnectionHolder(params) + const connectionHolder = new ConnectionHolder(params || {}) connectionHolderInit(connectionHolder) return new ReadOnlyConnectionHolder(connectionHolder) } diff --git a/packages/neo4j-driver/test/internal/connection-holder.test.js b/packages/neo4j-driver/test/internal/connection-holder.test.js index 58110fa81..bd9986c5d 100644 --- a/packages/neo4j-driver/test/internal/connection-holder.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder.test.js @@ -23,7 +23,8 @@ import FakeConnection from './fake-connection' import { internal } from 'neo4j-driver-core' const { - connectionHolder: { ConnectionHolder, EMPTY_CONNECTION_HOLDER } + connectionHolder: { ConnectionHolder, EMPTY_CONNECTION_HOLDER }, + logger: { Logger } } = internal describe('#unit EmptyConnectionHolder', () => { @@ -254,7 +255,7 @@ describe('#unit ConnectionHolder', () => { expect(connectionProvider.mode()).toBe(mode) } - verifyMode(new ConnectionHolder(), WRITE) + verifyMode(new ConnectionHolder({}), WRITE) verifyMode(new ConnectionHolder({ mode: WRITE }), WRITE) verifyMode(new ConnectionHolder({ mode: READ }), READ) }) @@ -266,7 +267,7 @@ describe('#unit ConnectionHolder', () => { const connectionProvider = newSingleConnectionProvider(new FakeConnection()) - verifyDefault(new ConnectionHolder()) + verifyDefault(new ConnectionHolder({})) verifyDefault(new ConnectionHolder({ mode: READ, connectionProvider })) verifyDefault(new ConnectionHolder({ mode: WRITE, connectionProvider })) verifyDefault( @@ -321,6 +322,78 @@ describe('#unit ConnectionHolder', () => { }) }) + describe('and connection is open but release fails', () => { + describe('when warn logging is enabled', () => { + let connection + let releaseError + let log + let warnSpy + + beforeEach(async () => { + log = new Logger('warn', () => {}) + warnSpy = spyOn(log, 'warn').and.callThrough() + + releaseError = new Error('something wrong is not right') + connection = new FakeConnection() + const originalRelease = connection.release.bind(connection) + connection.release = () => { + originalRelease() + return Promise.reject(releaseError) + } + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider, + log + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should log error as warning()', () => { + expect(warnSpy).toHaveBeenCalledWith(jasmine.stringMatching( + `ConnectionHolder got an error while releasing the connection. Error ${releaseError}. Stacktrace:` + )) + }) + }) + + describe('when warn logging is not enabled', () => { + let connection + let releaseError + let log + let warnSpy + + beforeEach(async () => { + log = new Logger('error', () => {}) + warnSpy = spyOn(log, 'warn').and.callThrough() + + releaseError = new Error('something wrong is not right') + connection = new FakeConnection() + const originalRelease = connection.release.bind(connection) + connection.release = () => { + originalRelease() + return Promise.reject(releaseError) + } + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider, + log + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should not log error', () => { + expect(warnSpy).not.toHaveBeenCalled() + }) + }) + }) + describe('and has not ongoing requests', () => { let connection From 2f531937885fbd250ef3bb362f80a2b36c545b01 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 14 Sep 2023 11:30:18 +0200 Subject: [PATCH 8/9] Refactory ConnectionConsumer interface and extract a method for acquire connection The method _acquireConnection and _run methods were replicating the same logic. The logic was extracted to a common method avoid duplication. --- packages/core/src/session.ts | 86 +++++++++--------- .../neo4j-driver-deno/lib/core/session.ts | 87 ++++++++++--------- 2 files changed, 91 insertions(+), 82 deletions(-) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 46de97335..6e27b1488 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -19,7 +19,7 @@ /* eslint-disable @typescript-eslint/promise-function-async */ -import { FailedObserver } from './internal/observers' +import { FailedObserver, ResultStreamObserver } from './internal/observers' import { validateQueryAndParameters } from './internal/util' import { FETCH_ALL, ACCESS_MODE_READ, ACCESS_MODE_WRITE } from './internal/constants' import { newError } from './error' @@ -40,7 +40,7 @@ import { RecordShape } from './record' import NotificationFilter from './notification-filter' import { Logger } from './internal/logger' -type ConnectionConsumer = (connection: Connection | null) => any | undefined | Promise | Promise +type ConnectionConsumer = (connection: Connection) => Promise | T type TransactionWork = (tx: Transaction) => Promise | T type ManagedTransactionWork = (tx: ManagedTransaction) => Promise | T @@ -189,7 +189,7 @@ class Session { const result = this._run(validatedQuery, params, async connection => { const bookmarks = await this._bookmarks() this._assertSessionIsOpen() - return (connection as Connection).run(validatedQuery, params, { + return connection.run(validatedQuery, params, { bookmarks, txConfig: autoCommitTxConfig, mode: this._mode, @@ -207,57 +207,61 @@ class Session { return result } - _run ( + _run ( query: Query, parameters: any, - customRunner: ConnectionConsumer + customRunner: ConnectionConsumer ): Result { - const connectionHolder = this._connectionHolderWithMode(this._mode) - - let observerPromise - if (!this._open) { - observerPromise = Promise.resolve( - new FailedObserver({ - error: newError('Cannot run query in a closed session.') - }) - ) - } else if (!this._hasTx && connectionHolder.initializeConnection()) { - observerPromise = connectionHolder - .getConnection() - .then(connection => customRunner(connection)) - .catch(error => Promise.resolve(new FailedObserver({ error }))) - } else { - observerPromise = Promise.resolve( - new FailedObserver({ - error: newError( - 'Queries cannot be run directly on a ' + - 'session with an open transaction; either run from within the ' + - 'transaction or use a different session.' - ) - }) - ) - } + const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(customRunner) + const observerPromise = resultPromise.catch(error => Promise.resolve(new FailedObserver({ error }))) const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark } return new Result(observerPromise, query, parameters, connectionHolder, watermarks) } - _acquireConnection (connectionConsumer: ConnectionConsumer): Promise { - let promise + /** + * This method is used by Rediscovery on the neo4j-driver-bolt-protocol package. + * + * @private + * @param {function()} connectionConsumer The method which will use the connection + * @returns {Promise} A connection promise + */ + _acquireConnection (connectionConsumer: ConnectionConsumer): Promise { + const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(connectionConsumer) + + return resultPromise.then(async (result: T) => { + await connectionHolder.releaseConnection() + return result + }) + } + + /** + * Acquires a {@link Connection}, consume it and return a promise of the result along with + * the {@link ConnectionHolder} used in the process. + * + * @private + * @param connectionConsumer + * @returns {object} The connection holder and connection promise. + */ + + private _acquireAndConsumeConnection(connectionConsumer: ConnectionConsumer): { + connectionHolder: ConnectionHolder + resultPromise: Promise + } { + let resultPromise: Promise const connectionHolder = this._connectionHolderWithMode(this._mode) if (!this._open) { - promise = Promise.reject( + resultPromise = Promise.reject( newError('Cannot run query in a closed session.') ) } else if (!this._hasTx && connectionHolder.initializeConnection()) { - promise = connectionHolder + resultPromise = connectionHolder .getConnection() - .then(connection => connectionConsumer(connection)) - .then(async result => { - await connectionHolder.releaseConnection() - return result - }) + // Connection won't be null at this point since the initialize method + // return + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .then(connection => connectionConsumer(connection!)) } else { - promise = Promise.reject( + resultPromise = Promise.reject( newError( 'Queries cannot be run directly on a ' + 'session with an open transaction; either run from within the ' + @@ -266,7 +270,7 @@ class Session { ) } - return promise + return { connectionHolder, resultPromise } } /** diff --git a/packages/neo4j-driver-deno/lib/core/session.ts b/packages/neo4j-driver-deno/lib/core/session.ts index db8678fe6..da417e4e5 100644 --- a/packages/neo4j-driver-deno/lib/core/session.ts +++ b/packages/neo4j-driver-deno/lib/core/session.ts @@ -19,7 +19,7 @@ /* eslint-disable @typescript-eslint/promise-function-async */ -import { FailedObserver } from './internal/observers.ts' +import { FailedObserver, ResultStreamObserver } from './internal/observers.ts' import { validateQueryAndParameters } from './internal/util.ts' import { FETCH_ALL, ACCESS_MODE_READ, ACCESS_MODE_WRITE } from './internal/constants.ts' import { newError } from './error.ts' @@ -40,7 +40,7 @@ import { RecordShape } from './record.ts' import NotificationFilter from './notification-filter.ts' import { Logger } from './internal/logger.ts' -type ConnectionConsumer = (connection: Connection | null) => any | undefined | Promise | Promise +type ConnectionConsumer = (connection: Connection) => Promise | T type TransactionWork = (tx: Transaction) => Promise | T type ManagedTransactionWork = (tx: ManagedTransaction) => Promise | T @@ -189,7 +189,7 @@ class Session { const result = this._run(validatedQuery, params, async connection => { const bookmarks = await this._bookmarks() this._assertSessionIsOpen() - return (connection as Connection).run(validatedQuery, params, { + return connection.run(validatedQuery, params, { bookmarks, txConfig: autoCommitTxConfig, mode: this._mode, @@ -207,57 +207,62 @@ class Session { return result } - _run ( + _run ( query: Query, parameters: any, - customRunner: ConnectionConsumer + customRunner: ConnectionConsumer ): Result { - const connectionHolder = this._connectionHolderWithMode(this._mode) - - let observerPromise - if (!this._open) { - observerPromise = Promise.resolve( - new FailedObserver({ - error: newError('Cannot run query in a closed session.') - }) - ) - } else if (!this._hasTx && connectionHolder.initializeConnection()) { - observerPromise = connectionHolder - .getConnection() - .then(connection => customRunner(connection)) - .catch(error => Promise.resolve(new FailedObserver({ error }))) - } else { - observerPromise = Promise.resolve( - new FailedObserver({ - error: newError( - 'Queries cannot be run directly on a ' + - 'session with an open transaction; either run from within the ' + - 'transaction or use a different session.' - ) - }) - ) - } + const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(customRunner) + const observerPromise = resultPromise.catch(error => Promise.resolve(new FailedObserver({ error }))) const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark } return new Result(observerPromise, query, parameters, connectionHolder, watermarks) } - _acquireConnection (connectionConsumer: ConnectionConsumer): Promise { - let promise + /** + * This method is used by Rediscovery on the neo4j-driver-bolt-protocol package. + * + * @private + * @param {function()} connectionConsumer The method which will use the connection + * @returns {Promise} A connection promise + */ + _acquireConnection (connectionConsumer: ConnectionConsumer): Promise { + const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(connectionConsumer) + + return resultPromise.then(async (result: T) => { + await connectionHolder.releaseConnection() + return result + }) + } + + /** + * Acquires a {@link Connection}, consume it and return a promise of the result along with + * the {@link ConnectionHolder} used in the process. + * + * @private + * @param connectionConsumer + * @returns {object} The connection holder and connection promise. + */ + + private _acquireAndConsumeConnection(connectionConsumer: ConnectionConsumer): { + connectionHolder: ConnectionHolder, + resultPromise: Promise + } { + + let resultPromise: Promise const connectionHolder = this._connectionHolderWithMode(this._mode) if (!this._open) { - promise = Promise.reject( + resultPromise = Promise.reject( newError('Cannot run query in a closed session.') ) } else if (!this._hasTx && connectionHolder.initializeConnection()) { - promise = connectionHolder + resultPromise = connectionHolder .getConnection() - .then(connection => connectionConsumer(connection)) - .then(async result => { - await connectionHolder.releaseConnection() - return result - }) + // Connection won't be null at this point since the initialize method + // return + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .then(connection => connectionConsumer(connection!)) } else { - promise = Promise.reject( + resultPromise = Promise.reject( newError( 'Queries cannot be run directly on a ' + 'session with an open transaction; either run from within the ' + @@ -266,7 +271,7 @@ class Session { ) } - return promise + return { connectionHolder, resultPromise } } /** From 4dc5fa8e7d046d13f82c498a4a809fff5ba45805 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 18 Sep 2023 09:46:21 +0200 Subject: [PATCH 9/9] Sync deno --- .../neo4j-driver-deno/lib/core/session.ts | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/packages/neo4j-driver-deno/lib/core/session.ts b/packages/neo4j-driver-deno/lib/core/session.ts index da417e4e5..08e1daf85 100644 --- a/packages/neo4j-driver-deno/lib/core/session.ts +++ b/packages/neo4j-driver-deno/lib/core/session.ts @@ -213,14 +213,14 @@ class Session { customRunner: ConnectionConsumer ): Result { const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(customRunner) - const observerPromise = resultPromise.catch(error => Promise.resolve(new FailedObserver({ error }))) + const observerPromise = resultPromise.catch(error => Promise.resolve(new FailedObserver({ error }))) const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark } return new Result(observerPromise, query, parameters, connectionHolder, watermarks) } /** * This method is used by Rediscovery on the neo4j-driver-bolt-protocol package. - * + * * @private * @param {function()} connectionConsumer The method which will use the connection * @returns {Promise} A connection promise @@ -235,19 +235,18 @@ class Session { } /** - * Acquires a {@link Connection}, consume it and return a promise of the result along with + * Acquires a {@link Connection}, consume it and return a promise of the result along with * the {@link ConnectionHolder} used in the process. - * + * * @private - * @param connectionConsumer + * @param connectionConsumer * @returns {object} The connection holder and connection promise. */ - private _acquireAndConsumeConnection(connectionConsumer: ConnectionConsumer): { - connectionHolder: ConnectionHolder, - resultPromise: Promise + private _acquireAndConsumeConnection(connectionConsumer: ConnectionConsumer): { + connectionHolder: ConnectionHolder + resultPromise: Promise } { - let resultPromise: Promise const connectionHolder = this._connectionHolderWithMode(this._mode) if (!this._open) { @@ -258,7 +257,7 @@ class Session { resultPromise = connectionHolder .getConnection() // Connection won't be null at this point since the initialize method - // return + // return // eslint-disable-next-line @typescript-eslint/no-non-null-assertion .then(connection => connectionConsumer(connection!)) } else {