diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index e07a4e1bf..dd95da067 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -82,7 +82,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { */ _createConnection ({ auth }, address, release) { return this._createChannelConnection(address).then(connection => { - connection._release = () => { + connection.release = () => { return release(address, connection) } this._openConnections[connection.id] = connection @@ -160,7 +160,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { await connection.resetAndFlush() } } finally { - await connection._release() + await connection.release() } return serverInfo } @@ -191,7 +191,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { } throw error } finally { - await Promise.all(connectionsToRelease.map(conn => conn._release())) + await Promise.all(connectionsToRelease.map(conn => conn.release())) } } @@ -201,7 +201,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { connection._sticky = connectionWithSameCredentials && !connection.supportsReAuth if (shouldCreateStickyConnection || connection._sticky) { - await connection._release() + await connection.release() throw newError('Driver is connected to a database that does not support user switch.') } } diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index b2a0e9209..749cf23e6 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -156,6 +156,26 @@ export default class ChannelConnection extends Connection { } } + beginTransaction (config) { + return this._protocol.beginTransaction(config) + } + + run (query, parameters, config) { + return this._protocol.run(query, parameters, config) + } + + commitTransaction (config) { + return this._protocol.commitTransaction(config) + } + + rollbackTransaction (config) { + return this._protocol.rollbackTransaction(config) + } + + getProtocolVersion () { + return this._protocol.version + } + get authToken () { return this._authToken } diff --git a/packages/bolt-connection/src/connection/connection-delegate.js b/packages/bolt-connection/src/connection/connection-delegate.js index 31e2ade56..b122cddfb 100644 --- a/packages/bolt-connection/src/connection/connection-delegate.js +++ b/packages/bolt-connection/src/connection/connection-delegate.js @@ -35,6 +35,26 @@ export default class DelegateConnection extends Connection { this._delegate = delegate } + beginTransaction (config) { + return this._delegate.beginTransaction(config) + } + + run (query, param, config) { + return this._delegate.run(query, param, config) + } + + commitTransaction (config) { + return this._delegate.commitTransaction(config) + } + + rollbackTransaction (config) { + return this._delegate.rollbackTransaction(config) + } + + getProtocolVersion () { + return this._delegate.getProtocolVersion() + } + get id () { return this._delegate.id } @@ -103,11 +123,11 @@ export default class DelegateConnection extends Connection { return this._delegate.close() } - _release () { + release () { if (this._originalErrorHandler) { this._delegate._errorHandler = this._originalErrorHandler } - return this._delegate._release() + return this._delegate.release() } } diff --git a/packages/bolt-connection/src/connection/connection.js b/packages/bolt-connection/src/connection/connection.js index f016d2719..61c7131c6 100644 --- a/packages/bolt-connection/src/connection/connection.js +++ b/packages/bolt-connection/src/connection/connection.js @@ -18,12 +18,14 @@ */ // eslint-disable-next-line no-unused-vars import { ResultStreamObserver, BoltProtocol } from '../bolt' +import { Connection as CoreConnection } from 'neo4j-driver-core' -export default class Connection { +export default class Connection extends CoreConnection { /** * @param {ConnectionErrorHandler} errorHandler the error handler */ constructor (errorHandler) { + super() this._errorHandler = errorHandler } @@ -51,13 +53,6 @@ export default class Connection { throw new Error('not implemented') } - /** - * @returns {boolean} whether this connection is in a working condition - */ - isOpen () { - throw new Error('not implemented') - } - /** * @returns {BoltProtocol} the underlying bolt protocol assigned to this connection */ @@ -109,18 +104,6 @@ export default class Connection { throw new Error('not implemented') } - /** - * Send a RESET-message to the database. Message is immediately flushed to the network. - * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. - */ - resetAndFlush () { - throw new Error('not implemented') - } - - hasOngoingObservableRequests () { - throw new Error('not implemented') - } - /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the connection is closed. diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index 187393f5f..105d4d137 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -273,53 +273,56 @@ class Pool { async _release (address, resource, pool) { const key = address.asKey() - if (pool.isActive()) { - // there exist idle connections for the given key - if (!await this._validateOnRelease(resource)) { + try { + if (pool.isActive()) { + // there exist idle connections for the given key + if (!await this._validateOnRelease(resource)) { + if (this._log.isDebugEnabled()) { + this._log.debug( + `${resource} destroyed and can't be released to the pool ${key} because it is not functional` + ) + } + pool.removeInUse(resource) + await this._destroy(resource) + } else { + if (this._installIdleObserver) { + this._installIdleObserver(resource, { + onError: error => { + this._log.debug( + `Idle connection ${resource} destroyed because of error: ${error}` + ) + const pool = this._pools[key] + if (pool) { + this._pools[key] = pool.filter(r => r !== resource) + pool.removeInUse(resource) + } + // let's not care about background clean-ups due to errors but just trigger the destroy + // process for the resource, we especially catch any errors and ignore them to avoid + // unhandled promise rejection warnings + this._destroy(resource).catch(() => {}) + } + }) + } + pool.push(resource) + if (this._log.isDebugEnabled()) { + this._log.debug(`${resource} released to the pool ${key}`) + } + } + } else { + // key has been purged, don't put it back, just destroy the resource if (this._log.isDebugEnabled()) { this._log.debug( - `${resource} destroyed and can't be released to the pool ${key} because it is not functional` + `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` ) } pool.removeInUse(resource) await this._destroy(resource) - } else { - if (this._installIdleObserver) { - this._installIdleObserver(resource, { - onError: error => { - this._log.debug( - `Idle connection ${resource} destroyed because of error: ${error}` - ) - const pool = this._pools[key] - if (pool) { - this._pools[key] = pool.filter(r => r !== resource) - pool.removeInUse(resource) - } - // let's not care about background clean-ups due to errors but just trigger the destroy - // process for the resource, we especially catch any errors and ignore them to avoid - // unhandled promise rejection warnings - this._destroy(resource).catch(() => {}) - } - }) - } - pool.push(resource) - if (this._log.isDebugEnabled()) { - this._log.debug(`${resource} released to the pool ${key}`) - } } - } else { - // key has been purged, don't put it back, just destroy the resource - if (this._log.isDebugEnabled()) { - this._log.debug( - `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` - ) - } - pool.removeInUse(resource) - await this._destroy(resource) - } - resourceReleased(key, this._activeResourceCounts) + } finally { + resourceReleased(key, this._activeResourceCounts) - this._processPendingAcquireRequests(address) + this._processPendingAcquireRequests(address) + } } async _purgeKey (key) { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index 0cfa6e06b..1ef8950ff 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -289,7 +289,7 @@ describe('constructor', () => { const connection = await create({}, server0, release) - const released = connection._release() + const released = connection.release() expect(released).toBe(releaseResult) expect(release).toHaveBeenCalledWith(server0, connection) @@ -546,7 +546,7 @@ describe('user-switching', () => { expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth: acquireAuth }, address) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) }) @@ -599,7 +599,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { await connectionProvider.verifyConnectivityAndGetServerInfo() - expect(seenConnections[0]._release).toHaveBeenCalledTimes(1) + expect(seenConnections[0].release).toHaveBeenCalledTimes(1) }) it('should resetAndFlush and then release the connection', async () => { @@ -607,7 +607,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { await connectionProvider.verifyConnectivityAndGetServerInfo() - expect(seenConnections[0]._release.mock.invocationCallOrder[0]) + expect(seenConnections[0].release.mock.invocationCallOrder[0]) .toBeGreaterThan(resetAndFlush.mock.invocationCallOrder[0]) }) @@ -636,7 +636,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { await connectionProvider.verifyConnectivityAndGetServerInfo() } catch (e) { } finally { - expect(seenConnections[0]._release).toHaveBeenCalledTimes(1) + expect(seenConnections[0].release).toHaveBeenCalledTimes(1) } }) @@ -692,7 +692,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { } connection.resetAndFlush = resetAndFlush if (releaseMock) { - connection._release = releaseMock + connection.release = releaseMock } seenConnections.push(connection) return connection @@ -782,7 +782,7 @@ class FakeConnection extends Connection { super(null) this._address = address - this._release = jest.fn(() => release(address, this)) + this.release = jest.fn(() => release(address, this)) this._server = server this._authToken = auth this._closed = false diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index d057f9d6e..823fe040b 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -2834,8 +2834,8 @@ describe.each([ expect(connections.length).toBe(1) expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() - expect(connections[0]._release.mock.invocationCallOrder[0]) + expect(connections[0].release).toHaveBeenCalled() + expect(connections[0].release.mock.invocationCallOrder[0]) .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) }) @@ -2856,7 +2856,7 @@ describe.each([ // extra checks expect(connections.length).toBe(1) - expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0].release).toHaveBeenCalled() }) it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => { @@ -2900,7 +2900,7 @@ describe.each([ expect(connections.length).toBe(1) expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0].release).toHaveBeenCalled() } } }) @@ -2956,8 +2956,8 @@ describe.each([ expect(connections.length).toBe(1) expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() - expect(connections[0]._release.mock.invocationCallOrder[0]) + expect(connections[0].release).toHaveBeenCalled() + expect(connections[0].release.mock.invocationCallOrder[0]) .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) } } @@ -2979,7 +2979,7 @@ describe.each([ expect(connections.length).toBe(1) expect(connections[0].resetAndFlush).toHaveBeenCalled() - expect(connections[0]._release).toHaveBeenCalled() + expect(connections[0].release).toHaveBeenCalled() } } }) @@ -3054,7 +3054,7 @@ describe.each([ connection.resetAndFlush = resetAndFlush } if (releaseMock) { - connection._release = releaseMock + connection.release = releaseMock } seenConnectionsPerAddress.get(address).push(connection) return connection @@ -3193,7 +3193,7 @@ describe.each([ const connection = await create({}, server0, release) - const released = connection._release() + const released = connection.release() expect(released).toBe(releaseResult) expect(release).toHaveBeenCalledWith(server0, connection) @@ -3460,7 +3460,7 @@ describe.each([ expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth }, server3) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) @@ -3502,7 +3502,7 @@ describe.each([ expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth }, server1) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) @@ -3546,7 +3546,7 @@ describe.each([ expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth }, server0) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) @@ -3575,7 +3575,7 @@ describe.each([ expect(error).toEqual(newError('Driver is connected to a database that does not support user switch.')) expect(poolAcquire).toHaveBeenCalledWith({ auth }, server0) - expect(connection._release).toHaveBeenCalled() + expect(connection.release).toHaveBeenCalled() expect(connection._sticky).toEqual(isStickyConn) }) }) @@ -3903,7 +3903,7 @@ class FakeConnection extends Connection { this._version = version this._protocolVersion = protocolVersion this.release = release - this._release = jest.fn(() => release(address, this)) + this.release = jest.fn(() => release(address, this)) this.resetAndFlush = jest.fn(() => Promise.resolve()) this._server = server this._authToken = authToken diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index fb89e882e..841488d3c 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -20,10 +20,13 @@ import ChannelConnection from '../../src/connection/connection-channel' import { int, internal, newError } from 'neo4j-driver-core' import { notificationFilterBehaviour } from '../bolt/behaviour' +import { ResultStreamObserver } from '../../src/bolt' const { serverAddress: { ServerAddress }, - logger: { Logger } + logger: { Logger }, + bookmarks: { Bookmarks }, + txConfig: { TxConfig } } = internal describe('ChannelConnection', () => { @@ -715,6 +718,182 @@ describe('ChannelConnection', () => { }) }) + describe('.beginTransaction()', () => { + it('should call redirect the request to the protocol', () => { + const observer = new ResultStreamObserver() + const protocol = { + beginTransaction: jest.fn(() => observer) + } + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const config = { + bookmarks: Bookmarks.empty(), + txConfig: TxConfig.empty(), + database: 'neo4j', + mode: 'READ', + impersonatedUser: 'other cat', + notificationFilter: { + minimumSeverityLevel: 'WARNING' + }, + beforeError: () => console.log('my error'), + afterComplete: (metadata) => console.log('metadata', metadata) + } + + const result = connection.beginTransaction(config) + + expect(result).toBe(observer) + expect(protocol.beginTransaction).toBeCalledWith(config) + }) + }) + + describe('.run()', () => { + it('should call redirect the request to the protocol', () => { + const observer = new ResultStreamObserver() + const protocol = { + run: jest.fn(() => observer) + } + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const query = 'RETURN $x' + const params = { x: 1 } + const config = { + bookmarks: Bookmarks.empty(), + txConfig: TxConfig.empty(), + database: 'neo4j', + mode: 'READ', + impersonatedUser: 'other cat', + notificationFilter: { + minimumSeverityLevel: 'WARNING' + }, + fetchSize: 1000, + highRecordWatermark: 1234, + lowRecordWatermark: 12, + reactive: false, + beforeError: () => console.log('my error'), + afterComplete: (metadata) => console.log('metadata', metadata) + } + + const result = connection.run(query, params, config) + + expect(result).toBe(observer) + expect(protocol.run).toBeCalledWith(query, params, config) + }) + }) + + describe('.commitTransaction()', () => { + it('should call redirect the request to the protocol', () => { + const observer = new ResultStreamObserver() + const protocol = { + commitTransaction: jest.fn(() => observer) + } + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const config = { + beforeError: () => console.log('my error'), + afterComplete: (metadata) => console.log('metadata', metadata) + } + + const result = connection.commitTransaction(config) + + expect(result).toBe(observer) + expect(protocol.commitTransaction).toBeCalledWith(config) + }) + }) + + describe('.rollbackTransaction()', () => { + it('should call redirect the request to the protocol', () => { + const observer = new ResultStreamObserver() + const protocol = { + rollbackTransaction: jest.fn(() => observer) + } + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const config = { + beforeError: () => console.log('my error'), + afterComplete: (metadata) => console.log('metadata', metadata) + } + + const result = connection.rollbackTransaction(config) + + expect(result).toBe(observer) + expect(protocol.rollbackTransaction).toBeCalledWith(config) + }) + }) + + describe('.isOpen()', () => { + it('should return true when is not broken and channel is open', () => { + const connection = spyOnConnectionChannel({ protocolSupplier: () => ({}), channel: { _open: true } }) + + expect(connection.isOpen()).toBe(true) + }) + + it('should return true when is not broken and channel not is open', () => { + const connection = spyOnConnectionChannel({ protocolSupplier: () => ({}), channel: { _open: false } }) + + expect(connection.isOpen()).toBe(false) + }) + + it('should return true when is broken and channel not is open', () => { + const connection = spyOnConnectionChannel({ + protocolSupplier: () => ({ + notifyFatalError: () => {} + }), + channel: { _open: false } + }) + + connection._handleFatalError(new Error('the error which makes the connection be broken.')) + + expect(connection.isOpen()).toBe(false) + }) + + it('should return true when is broken and channel is open', () => { + const connection = spyOnConnectionChannel({ + protocolSupplier: () => ({ + notifyFatalError: () => {} + }), + channel: { _open: true } + }) + + connection._handleFatalError(new Error('the error which makes the connection be broken.')) + + expect(connection.isOpen()).toBe(false) + }) + }) + + describe('.getProtocolVersion()', () => { + it('should call redirect request to the protocol', () => { + const version = 5.3 + const getVersion = jest.fn(() => version) + const protocol = { + get version () { + return getVersion() + } + } + + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const result = connection.getProtocolVersion() + + expect(result).toBe(version) + expect(getVersion).toBeCalledWith() + }) + }) + + describe('.hasOngoingObservableRequests()', () => { + it('should call redirect request to the protocol', () => { + const protocol = { + hasOngoingObservableRequests: jest.fn(() => true) + } + + const connection = spyOnConnectionChannel({ protocolSupplier: () => protocol }) + + const result = connection.hasOngoingObservableRequests() + + expect(result).toBe(true) + expect(protocol.hasOngoingObservableRequests).toBeCalledWith() + }) + }) + function spyOnConnectionChannel ({ channel, errorHandler, diff --git a/packages/neo4j-driver/test/internal/connection-delegate.test.js b/packages/bolt-connection/test/connection/connection-delegate.test.js similarity index 61% rename from packages/neo4j-driver/test/internal/connection-delegate.test.js rename to packages/bolt-connection/test/connection/connection-delegate.test.js index d88508442..bc6334716 100644 --- a/packages/neo4j-driver/test/internal/connection-delegate.test.js +++ b/packages/bolt-connection/test/connection/connection-delegate.test.js @@ -16,20 +16,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import DelegateConnection from '../../../bolt-connection/lib/connection/connection-delegate' -import Connection from '../../../bolt-connection/lib/connection/connection' -import { BoltProtocol } from '../../../bolt-connection/lib/bolt' -import ConnectionErrorHandler from '../../../bolt-connection/lib/connection/connection-error-handler' -import { internal } from 'neo4j-driver-core' + +import Connection from '../../src/connection/connection' +import DelegateConnection from '../../src/connection/connection-delegate' +import { Connection as CoreConnection, internal } from 'neo4j-driver-core' +import ConnectionErrorHandler from '../../src/connection/connection-error-handler' +import { BoltProtocol } from '../../src/bolt' +import utils from '../test-utils' const { serverAddress: { ServerAddress: BoltAddress } } = internal -describe('#unit DelegateConnection', () => { +describe('DelegateConnection', () => { + beforeEach(() => { + expect.extend(utils.matchers) + }) + + const NON_DELEGATE_METHODS = [ + 'constructor', + // the delegate replaces the error handler of the original connection + // and not delegate the requests to the previous connection until released + 'handleAndTransformError' + ] + it('should delegate get id', () => { const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'id', 'get').and.returnValue(5) + const spy = jest.spyOn(delegate, 'id', 'get').mockReturnValue(5) const connection = new DelegateConnection(delegate, null) expect(connection.id).toBe(5) @@ -38,7 +51,7 @@ describe('#unit DelegateConnection', () => { it('should delegate get databaseId', () => { const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'databaseId', 'get').and.returnValue( + const spy = jest.spyOn(delegate, 'databaseId', 'get').mockReturnValue( '123-456' ) const connection = new DelegateConnection(delegate, null) @@ -49,7 +62,7 @@ describe('#unit DelegateConnection', () => { it('should delegate set databaseId', () => { const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'databaseId', 'set') + const spy = jest.spyOn(delegate, 'databaseId', 'set').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.databaseId = '345-678' @@ -63,7 +76,7 @@ describe('#unit DelegateConnection', () => { version: 'Neo4j/3.5.6' } const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'server', 'get').and.returnValue(server) + const spy = jest.spyOn(delegate, 'server', 'get').mockReturnValue(server) const connection = new DelegateConnection(delegate, null) expect(connection.server).toBe(server) @@ -73,7 +86,7 @@ describe('#unit DelegateConnection', () => { it('should delegate get address', () => { const address = BoltAddress.fromUrl('bolt://127.0.0.1:8080') const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'address', 'get').and.returnValue( + const spy = jest.spyOn(delegate, 'address', 'get').mockReturnValue( address ) const connection = new DelegateConnection(delegate, null) @@ -85,7 +98,7 @@ describe('#unit DelegateConnection', () => { it('should delegate get version', () => { const version = 'Neo4j/3.5.6' const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'version', 'get').and.returnValue( + const spy = jest.spyOn(delegate, 'version', 'get').mockReturnValue( version ) const connection = new DelegateConnection(delegate, null) @@ -96,7 +109,7 @@ describe('#unit DelegateConnection', () => { it('should delegate set version', () => { const delegate = new Connection(null) - const spy = spyOnProperty(delegate, 'version', 'set') + const spy = jest.spyOn(delegate, 'version', 'set').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.version = 'Neo4j/3.4.9' @@ -106,7 +119,7 @@ describe('#unit DelegateConnection', () => { it('should delegate isOpen', () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'isOpen').and.returnValue(true) + const spy = jest.spyOn(delegate, 'isOpen').mockReturnValue(true) const connection = new DelegateConnection(delegate, null) expect(connection.isOpen()).toBeTruthy() @@ -116,7 +129,7 @@ describe('#unit DelegateConnection', () => { it('should delegate protocol', () => { const protocol = new BoltProtocol() const delegate = new Connection(null) - const spy = spyOn(delegate, 'protocol').and.returnValue(protocol) + const spy = jest.spyOn(delegate, 'protocol').mockReturnValue(protocol) const connection = new DelegateConnection(delegate, null) expect(connection.protocol()).toBe(protocol) @@ -125,7 +138,7 @@ describe('#unit DelegateConnection', () => { it('should delegate connect', () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'connect') + const spy = jest.spyOn(delegate, 'connect').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.connect('neo4j/js-driver', 'mydriver/0.0.0 some system info', {}) @@ -135,7 +148,7 @@ describe('#unit DelegateConnection', () => { it('should delegate write', () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'write') + const spy = jest.spyOn(delegate, 'write').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.write({}, null, true) @@ -145,7 +158,7 @@ describe('#unit DelegateConnection', () => { it('should delegate resetAndFlush', () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'resetAndFlush') + const spy = jest.spyOn(delegate, 'resetAndFlush').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) connection.resetAndFlush() @@ -155,7 +168,7 @@ describe('#unit DelegateConnection', () => { it('should delegate close', async () => { const delegate = new Connection(null) - const spy = spyOn(delegate, 'close').and.returnValue(Promise.resolve()) + const spy = jest.spyOn(delegate, 'close').mockReturnValue(Promise.resolve()) const connection = new DelegateConnection(delegate, null) await connection.close() @@ -163,13 +176,13 @@ describe('#unit DelegateConnection', () => { expect(spy).toHaveBeenCalledTimes(1) }) - it('should delegate _release', () => { + it('should delegate release', () => { const delegate = new Connection(null) - delegate._release = () => {} - const spy = spyOn(delegate, '_release') + delegate.release = () => {} + const spy = jest.spyOn(delegate, 'release').mockImplementation(() => {}) const connection = new DelegateConnection(delegate, null) - connection._release() + connection.release() expect(spy).toHaveBeenCalledTimes(1) }) @@ -177,7 +190,7 @@ describe('#unit DelegateConnection', () => { it('should override errorHandler on create and restore on release', () => { const errorHandlerOriginal = new ConnectionErrorHandler('code1') const delegate = new Connection(errorHandlerOriginal) - delegate._release = () => {} + delegate.release = () => {} expect(delegate._errorHandler).toBe(errorHandlerOriginal) @@ -186,8 +199,31 @@ describe('#unit DelegateConnection', () => { expect(delegate._errorHandler).toBe(errorHandlerNew) - connection._release() + connection.release() expect(delegate._errorHandler).toBe(errorHandlerOriginal) }) + + it.each(getDelegatedMethods())('should delegate %s calls with exact args number and return value', (delegatedMethod) => { + const result = 'the result' + const method = CoreConnection.prototype[delegatedMethod] || Connection.prototype[delegatedMethod] + const argsNumber = method.length // function.length returns the number of arguments expected by the function + const args = [...Array(argsNumber).keys()] + + const connection = { + [delegatedMethod]: jest.fn(() => result) + } + + const delegatedConnection = new DelegateConnection(connection) + + expect(delegatedConnection[delegatedMethod](...args)).toBe(result) + expect(connection[delegatedMethod]).toHaveBeenCalledTimes(1) + expect(connection[delegatedMethod]).toBeCalledWith(...args) + expect(connection[delegatedMethod]).toBeCalledWithThis(connection) + }) + + function getDelegatedMethods () { + const allMethods = new Set([...Object.keys(Connection.prototype), ...Object.keys(CoreConnection.prototype)]) + return [...allMethods].filter(method => !NON_DELEGATE_METHODS.includes(method)) + } }) diff --git a/packages/bolt-connection/test/fake-connection.js b/packages/bolt-connection/test/fake-connection.js index af5cfc870..a9061bb76 100644 --- a/packages/bolt-connection/test/fake-connection.js +++ b/packages/bolt-connection/test/fake-connection.js @@ -95,7 +95,7 @@ export default class FakeConnection extends Connection { return Promise.resolve() } - _release () { + release () { this.releaseInvoked++ return Promise.resolve() } diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js index 668f2d5e8..32c7b9c72 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -125,6 +125,146 @@ describe('#unit Pool', () => { expect(destroyed[1].id).toBe(r1.id) }) + it('should release resources and process acquisitions when destroy connection', async () => { + // Given a pool that allocates + let counter = 0 + const destroyed = [] + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (_acquisitionContext, server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + destroyed.push(res) + return Promise.resolve() + }, + validateOnRelease: res => false, + config: new PoolConfig(2, 10000) + }) + + // When + const r0 = await pool.acquire({}, address) + const r1 = await pool.acquire({}, address) + const promiseOfR2Status = { state: 'pending' } + const promiseOfR2 = pool.acquire({}, address) + .then(r2 => { + promiseOfR2Status.state = 'resolved' + return r2 + }).catch((e) => { + promiseOfR2Status.state = 'rejected' + throw e + }) + + expect(promiseOfR2Status.state).toEqual('pending') + + await r0.close() + await r1.close() + + // Then + const r2 = await promiseOfR2 + + await r2.close() + + expect(destroyed.length).toBe(3) + expect(destroyed[0].id).toBe(r0.id) + expect(destroyed[1].id).toBe(r1.id) + expect(destroyed[2].id).toBe(r2.id) + }) + + it('should release resources and process acquisitions when destroy connection fails', async () => { + // Given a pool that allocates + let counter = 0 + const theMadeUpError = new Error('I made this error for testing') + const destroyed = [] + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (_acquisitionContext, server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + destroyed.push(res) + return Promise.reject(theMadeUpError) + }, + validateOnRelease: res => false, + config: new PoolConfig(2, 3000) + }) + + // When + const r0 = await pool.acquire({}, address) + const r1 = await pool.acquire({}, address) + const promiseOfR2Status = { state: 'pending' } + const promiseOfR2 = pool.acquire({}, address) + .then(r2 => { + promiseOfR2Status.state = 'resolved' + return r2 + }).catch((e) => { + promiseOfR2Status.state = 'rejected' + throw e + }) + + expect(promiseOfR2Status.state).toEqual('pending') + + await expect(r0.close()).rejects.toThrow(theMadeUpError) + await expect(r1.close()).rejects.toThrow(theMadeUpError) + + // Then + const r2 = await promiseOfR2 + + await expect(r2.close()).rejects.toThrow(theMadeUpError) + + expect(destroyed.length).toBe(3) + expect(destroyed[0].id).toBe(r0.id) + expect(destroyed[1].id).toBe(r1.id) + expect(destroyed[2].id).toBe(r2.id) + }) + + it('should release resources and process acquisitions when destroy connection fails in closed pools', async () => { + // Given a pool that allocates + let counter = 0 + const theMadeUpError = new Error('I made this error for testing') + const destroyed = [] + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (_acquisitionContext, server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + destroyed.push(res) + return Promise.reject(theMadeUpError) + }, + validateOnRelease: res => true, + config: new PoolConfig(2, 3000) + }) + + // When + const r0 = await pool.acquire({}, address) + const r1 = await pool.acquire({}, address) + + const promiseOfR2Status = { state: 'pending' } + const promiseOfR2 = pool.acquire({}, address) + .then(r2 => { + promiseOfR2Status.state = 'resolved' + return r2 + }).catch((e) => { + promiseOfR2Status.state = 'rejected' + throw e + }) + + await pool.purge(address) + + expect(promiseOfR2Status.state).toEqual('pending') + + await expect(r0.close()).rejects.toThrow(theMadeUpError) + await expect(r1.close()).rejects.toThrow(theMadeUpError) + + // Then + const r2 = await promiseOfR2 + + // Don't fail since the pool will be open again + await r2.close() + + expect(destroyed.length).toBe(2) + expect(destroyed[0].id).toBe(r0.id) + expect(destroyed[1].id).toBe(r1.id) + }) + it('frees if validateOnRelease returns Promise.resolve(false)', async () => { // Given a pool that allocates let counter = 0 @@ -1309,6 +1449,6 @@ class Resource { } close () { - this.release(this.key, this) + return this.release(this.key, this) } } diff --git a/packages/bolt-connection/test/test-utils.js b/packages/bolt-connection/test/test-utils.js index 9eb7e31fd..b4ac053f8 100644 --- a/packages/bolt-connection/test/test-utils.js +++ b/packages/bolt-connection/test/test-utils.js @@ -53,6 +53,7 @@ const matchers = { } else { result.message = `Expected '${actual}' to be an element of '[${expected}]', but it wasn't` } + return result }, toBeMessage: function (actual, expected) { if (expected === undefined) { @@ -84,6 +85,12 @@ const matchers = { result.message = () => `Expected message '[${failures}]', but it didn't` } return result + }, + toBeCalledWithThis: function (theMockedFunction, thisArg) { + return { + pass: theMockedFunction.mock.contexts.filter(ctx => ctx === thisArg).length > 0, + message: () => `Expected to be called with this equals to '${json.stringify(thisArg)}', but it wasn't.` + } } } diff --git a/packages/core/src/connection-provider.ts b/packages/core/src/connection-provider.ts index bd0c96f03..a9378bf15 100644 --- a/packages/core/src/connection-provider.ts +++ b/packages/core/src/connection-provider.ts @@ -23,6 +23,18 @@ import { bookmarks } from './internal' import { ServerInfo } from './result-summary' import { AuthToken } from './types' +/** + * Interface define a releasable resource shape + * + * @private + * @interface + */ +class Releasable { + release (): Promise { + throw new Error('Not implemented') + } +} + /** * Interface define a common way to acquire a connection * @@ -53,7 +65,7 @@ class ConnectionProvider { impersonatedUser?: string onDatabaseNameResolved?: (databaseName?: string) => void auth?: AuthToken - }): Promise { + }): Promise { throw Error('Not implemented') } @@ -150,3 +162,6 @@ class ConnectionProvider { } export default ConnectionProvider +export { + Releasable +} diff --git a/packages/core/src/connection.ts b/packages/core/src/connection.ts index 5782b7c93..367d14945 100644 --- a/packages/core/src/connection.ts +++ b/packages/core/src/connection.ts @@ -18,121 +18,91 @@ */ /* eslint-disable @typescript-eslint/promise-function-async */ -import { ServerAddress } from './internal/server-address' - -/** - * Interface which defines the raw connection with the database - * @private - */ -class Connection { - get id (): string { - return '' - } +import { Bookmarks } from './internal/bookmarks' +import { AccessMode } from './internal/constants' +import { ResultStreamObserver } from './internal/observers' +import { TxConfig } from './internal/tx-config' +import NotificationFilter from './notification-filter' + +interface HasBeforeErrorAndAfterComplete { + beforeError?: (error: Error) => void + afterComplete?: (metadata: unknown) => void +} - get databaseId (): string { - return '' - } +interface BeginTransactionConfig extends HasBeforeErrorAndAfterComplete { + bookmarks: Bookmarks + txConfig: TxConfig + mode?: AccessMode + database?: string + impersonatedUser?: string + notificationFilter?: NotificationFilter +} - get server (): any { - return {} - } +interface CommitTransactionConfig extends HasBeforeErrorAndAfterComplete { - /** - * @property {object} authToken The auth registered in the connection - */ - get authToken (): any { - return {} - } +} - /** - * @property {ServerAddress} the server address this connection is opened against - */ - get address (): ServerAddress | undefined { - return undefined - } +interface RollbackConnectionConfig extends HasBeforeErrorAndAfterComplete { - /** - * @property {ServerVersion} the version of the server this connection is connected to - */ - get version (): any { - return undefined - } +} - /** - * @property {boolean} supportsReAuth Indicates the connection supports re-auth - */ - get supportsReAuth (): boolean { - return false - } +interface RunQueryConfig extends BeginTransactionConfig { + fetchSize: number + highRecordWatermark: number + lowRecordWatermark: number + reactive: boolean +} - /** - * @returns {boolean} whether this connection is in a working condition - */ - isOpen (): boolean { - return false +/** + * Interface which defines a connection for the core driver object. + * + * + * This connection exposes only methods used by the code module. + * Methods with connection implementation details can be defined and used + * by the implementation layer. + * + * @private + * @interface + */ +class Connection { + beginTransaction (config: BeginTransactionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * @todo be removed and internalize the methods - * @returns {any} the underlying bolt protocol assigned to this connection - */ - protocol (): any { - throw Error('Not implemented') + run (query: string, parameters?: Record, config?: RunQueryConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Connect to the target address, negotiate Bolt protocol and send initialization message. - * @param {string} userAgent the user agent for this driver. - * @param {string} boltAgent the bolt agent for this driver. - * @param {Object} authToken the object containing auth information. - * @param {Object} waitReAuth whether to connect method should wait until re-Authorised - * @return {Promise} promise resolved with the current connection if connection is successful. Rejected promise otherwise. - */ - connect (userAgent: string, boltAgent: string, authToken: any, waitReAuth: false): Promise { - throw Error('Not implemented') + commitTransaction (config: CommitTransactionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Write a message to the network channel. - * @param {RequestMessage} message the message to write. - * @param {ResultStreamObserver} observer the response observer. - * @param {boolean} flush `true` if flush should happen after the message is written to the buffer. - */ - write (message: any, observer: any, flush: boolean): void { - throw Error('Not implemented') + rollbackTransaction (config: RollbackConnectionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Send a RESET-message to the database. Message is immediately flushed to the network. - * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. - */ resetAndFlush (): Promise { - throw Error('Not implemented') + throw new Error('Not implemented') } - /** - * Checks if there is an ongoing request being handled - * @return {boolean} `true` if there is an ongoing request being handled - */ - hasOngoingObservableRequests (): boolean { - throw Error('Not implemented') + isOpen (): boolean { + throw new Error('Not implemented') } - /** - * Call close on the channel. - * @returns {Promise} - A promise that will be resolved when the connection is closed. - * - */ - close (): Promise { - throw Error('Not implemented') + getProtocolVersion (): number { + throw new Error('Not implemented') } - /** - * Called to release the connection - */ - _release (): Promise { - return Promise.resolve() + hasOngoingObservableRequests (): boolean { + throw new Error('Not implemented') } } export default Connection + +export type { + BeginTransactionConfig, + CommitTransactionConfig, + RollbackConnectionConfig, + RunQueryConfig +} diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 8ad38774d..c21299c9a 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -21,10 +21,11 @@ import { newError } from '../error' import { assertString } from './util' import Connection from '../connection' -import { ACCESS_MODE_WRITE } from './constants' +import { ACCESS_MODE_WRITE, AccessMode } from './constants' import { Bookmarks } from './bookmarks' -import ConnectionProvider from '../connection-provider' +import ConnectionProvider, { Releasable } from '../connection-provider' import { AuthToken } from '../types' +import { Logger } from './logger' /** * @private @@ -77,16 +78,17 @@ interface ConnectionHolderInterface { * @private */ class ConnectionHolder implements ConnectionHolderInterface { - private readonly _mode: string + private readonly _mode: AccessMode private _database?: string private readonly _bookmarks: Bookmarks private readonly _connectionProvider?: ConnectionProvider private _referenceCount: number - private _connectionPromise: Promise + private _connectionPromise: Promise private readonly _impersonatedUser?: string private readonly _getConnectionAcquistionBookmarks: () => Promise private readonly _onDatabaseNameResolved?: (databaseName?: string) => void private readonly _auth?: AuthToken + private readonly _log: Logger private _closed: boolean /** @@ -102,16 +104,17 @@ class ConnectionHolder implements ConnectionHolderInterface { * @property {AuthToken} params.auth - the target auth for the to-be-acquired connection */ constructor ({ - mode = ACCESS_MODE_WRITE, + mode, database = '', bookmarks, connectionProvider, impersonatedUser, onDatabaseNameResolved, getConnectionAcquistionBookmarks, - auth + auth, + log }: { - mode?: string + mode?: AccessMode database?: string bookmarks?: Bookmarks connectionProvider?: ConnectionProvider @@ -119,8 +122,9 @@ class ConnectionHolder implements ConnectionHolderInterface { onDatabaseNameResolved?: (databaseName?: string) => void getConnectionAcquistionBookmarks?: () => Promise auth?: AuthToken - } = {}) { - this._mode = mode + log: Logger + }) { + this._mode = mode ?? ACCESS_MODE_WRITE this._closed = false this._database = database != null ? assertString(database, 'database') : '' this._bookmarks = bookmarks ?? Bookmarks.empty() @@ -130,10 +134,12 @@ class ConnectionHolder implements ConnectionHolderInterface { this._connectionPromise = Promise.resolve(null) this._onDatabaseNameResolved = onDatabaseNameResolved this._auth = auth + this._log = log + this._logError = this._logError.bind(this) this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty())) } - mode (): string | undefined { + mode (): AccessMode | undefined { return this._mode } @@ -168,7 +174,7 @@ class ConnectionHolder implements ConnectionHolderInterface { return true } - private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { + private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { return await connectionProvider.acquireConnection({ accessMode: this._mode, database: this._database, @@ -209,6 +215,10 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._releaseConnection(hasTx) } + log (): Logger { + return this._log + } + /** * Return the current pooled connection instance to the connection pool. * We don't pool Session instances, to avoid users using the Session after they've called close. @@ -218,23 +228,32 @@ class ConnectionHolder implements ConnectionHolderInterface { */ private _releaseConnection (hasTx?: boolean): Promise { this._connectionPromise = this._connectionPromise - .then((connection?: Connection | null) => { + .then((connection?: Connection & Releasable | null) => { if (connection != null) { if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx === true)) { return connection .resetAndFlush() .catch(ignoreError) - .then(() => connection._release().then(() => null)) + .then(() => connection.release().then(() => null)) } - return connection._release().then(() => null) + return connection.release().then(() => null) } else { return Promise.resolve(null) } }) - .catch(ignoreError) + .catch(this._logError) return this._connectionPromise } + + _logError (error: Error): null { + if (this._log.isWarnEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this._log.warn(`ConnectionHolder got an error while releasing the connection. Error ${error}. Stacktrace: ${error.stack}`) + } + + return null + } } /** @@ -245,7 +264,7 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { private readonly _connectionHolder: ConnectionHolder /** - * Contructor + * Constructor * @param {ConnectionHolder} connectionHolder the connection holder which will treat the requests */ constructor (connectionHolder: ConnectionHolder) { @@ -255,7 +274,8 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { bookmarks: connectionHolder.bookmarks(), // @ts-expect-error getConnectionAcquistionBookmarks: connectionHolder._getConnectionAcquistionBookmarks, - connectionProvider: connectionHolder.connectionProvider() + connectionProvider: connectionHolder.connectionProvider(), + log: connectionHolder.log() }) this._connectionHolder = connectionHolder } @@ -298,6 +318,13 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { } class EmptyConnectionHolder extends ConnectionHolder { + constructor () { + super({ + // Empty logger + log: Logger.create({}) + }) + } + mode (): undefined { return undefined } diff --git a/packages/core/src/internal/constants.ts b/packages/core/src/internal/constants.ts index 2c56e9f1b..f4cf14d9d 100644 --- a/packages/core/src/internal/constants.ts +++ b/packages/core/src/internal/constants.ts @@ -38,6 +38,8 @@ const BOLT_PROTOCOL_V5_1: number = 5.1 const BOLT_PROTOCOL_V5_2: number = 5.2 const BOLT_PROTOCOL_V5_3: number = 5.3 +export type AccessMode = typeof ACCESS_MODE_READ | typeof ACCESS_MODE_WRITE + export { FETCH_ALL, ACCESS_MODE_READ, diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index f025d9fa0..1ef7771d2 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -513,7 +513,7 @@ class Result implements Promise - connection?.protocol()?.version + connection?.getProtocolVersion() ), // onRejected: _ => undefined diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index bb3b5572a..6e27b1488 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -19,7 +19,7 @@ /* eslint-disable @typescript-eslint/promise-function-async */ -import { FailedObserver } from './internal/observers' +import { FailedObserver, ResultStreamObserver } from './internal/observers' import { validateQueryAndParameters } from './internal/util' import { FETCH_ALL, ACCESS_MODE_READ, ACCESS_MODE_WRITE } from './internal/constants' import { newError } from './error' @@ -40,7 +40,7 @@ import { RecordShape } from './record' import NotificationFilter from './notification-filter' import { Logger } from './internal/logger' -type ConnectionConsumer = (connection: Connection | null) => any | undefined | Promise | Promise +type ConnectionConsumer = (connection: Connection) => Promise | T type TransactionWork = (tx: Transaction) => Promise | T type ManagedTransactionWork = (tx: ManagedTransaction) => Promise | T @@ -75,7 +75,7 @@ class Session { private readonly _results: Result[] private readonly _bookmarkManager?: BookmarkManager private readonly _notificationFilter?: NotificationFilter - private readonly _log?: Logger + private readonly _log: Logger /** * @constructor * @protected @@ -132,7 +132,8 @@ class Session { connectionProvider, impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, + log }) this._writeConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_WRITE, @@ -142,7 +143,8 @@ class Session { connectionProvider, impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, + log }) this._open = true this._hasTx = false @@ -187,7 +189,7 @@ class Session { const result = this._run(validatedQuery, params, async connection => { const bookmarks = await this._bookmarks() this._assertSessionIsOpen() - return (connection as Connection).protocol().run(validatedQuery, params, { + return connection.run(validatedQuery, params, { bookmarks, txConfig: autoCommitTxConfig, mode: this._mode, @@ -205,57 +207,61 @@ class Session { return result } - _run ( + _run ( query: Query, parameters: any, - customRunner: ConnectionConsumer + customRunner: ConnectionConsumer ): Result { - const connectionHolder = this._connectionHolderWithMode(this._mode) - - let observerPromise - if (!this._open) { - observerPromise = Promise.resolve( - new FailedObserver({ - error: newError('Cannot run query in a closed session.') - }) - ) - } else if (!this._hasTx && connectionHolder.initializeConnection()) { - observerPromise = connectionHolder - .getConnection() - .then(connection => customRunner(connection)) - .catch(error => Promise.resolve(new FailedObserver({ error }))) - } else { - observerPromise = Promise.resolve( - new FailedObserver({ - error: newError( - 'Queries cannot be run directly on a ' + - 'session with an open transaction; either run from within the ' + - 'transaction or use a different session.' - ) - }) - ) - } + const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(customRunner) + const observerPromise = resultPromise.catch(error => Promise.resolve(new FailedObserver({ error }))) const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark } return new Result(observerPromise, query, parameters, connectionHolder, watermarks) } - _acquireConnection (connectionConsumer: ConnectionConsumer): Promise { - let promise + /** + * This method is used by Rediscovery on the neo4j-driver-bolt-protocol package. + * + * @private + * @param {function()} connectionConsumer The method which will use the connection + * @returns {Promise} A connection promise + */ + _acquireConnection (connectionConsumer: ConnectionConsumer): Promise { + const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(connectionConsumer) + + return resultPromise.then(async (result: T) => { + await connectionHolder.releaseConnection() + return result + }) + } + + /** + * Acquires a {@link Connection}, consume it and return a promise of the result along with + * the {@link ConnectionHolder} used in the process. + * + * @private + * @param connectionConsumer + * @returns {object} The connection holder and connection promise. + */ + + private _acquireAndConsumeConnection(connectionConsumer: ConnectionConsumer): { + connectionHolder: ConnectionHolder + resultPromise: Promise + } { + let resultPromise: Promise const connectionHolder = this._connectionHolderWithMode(this._mode) if (!this._open) { - promise = Promise.reject( + resultPromise = Promise.reject( newError('Cannot run query in a closed session.') ) } else if (!this._hasTx && connectionHolder.initializeConnection()) { - promise = connectionHolder + resultPromise = connectionHolder .getConnection() - .then(connection => connectionConsumer(connection)) - .then(async result => { - await connectionHolder.releaseConnection() - return result - }) + // Connection won't be null at this point since the initialize method + // return + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .then(connection => connectionConsumer(connection!)) } else { - promise = Promise.reject( + resultPromise = Promise.reject( newError( 'Queries cannot be run directly on a ' + 'session with an open transaction; either run from within the ' + @@ -264,7 +270,7 @@ class Session { ) } - return promise + return { connectionHolder, resultPromise } } /** diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index c5c979c57..1f51c142e 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -139,7 +139,7 @@ class Transaction { this._onConnection() if (connection != null) { this._bookmarks = await getBookmarks() - return connection.protocol().beginTransaction({ + return connection.beginTransaction({ bookmarks: this._bookmarks, txConfig, mode: this._connectionHolder.mode(), @@ -150,13 +150,13 @@ class Transaction { if (events != null) { events.onError(error) } - return this._onError(error) + this._onError(error).catch(() => {}) }, afterComplete: (metadata: any) => { if (events != null) { events.onComplete(metadata) } - return this._onComplete(metadata) + this._onComplete(metadata) } }) } else { @@ -364,7 +364,7 @@ const _states = { } }, run: ( - query: Query, + query: string, parameters: any, { connectionHolder, @@ -388,7 +388,7 @@ const _states = { .then(conn => { onConnection() if (conn != null) { - return conn.protocol().run(query, parameters, { + return conn.run(query, parameters, { bookmarks: Bookmarks.empty(), txConfig: TxConfig.empty(), beforeError: onError, @@ -643,12 +643,12 @@ function finishTransaction ( return Promise.all(pendingResults.map(result => result.summary())).then(results => { if (connection != null) { if (commit) { - return connection.protocol().commitTransaction({ + return connection.commitTransaction({ beforeError: onError, afterComplete: onComplete }) } else { - return connection.protocol().rollbackTransaction({ + return connection.rollbackTransaction({ beforeError: onError, afterComplete: onComplete }) diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index c781dbaaf..083b03641 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -27,6 +27,7 @@ import { import ResultStreamObserverMock from './utils/result-stream-observer.mock' import Result from '../src/result' import FakeConnection from './utils/connection.fake' +import { Logger } from '../src/internal/logger' interface AB { a: number @@ -154,7 +155,7 @@ describe('Result', () => { ])('when query=%s and parameters=%s', (query, params, expected) => { let connectionHolderMock: connectionHolder.ConnectionHolder beforeEach(() => { - connectionHolderMock = new connectionHolder.ConnectionHolder({}) + connectionHolderMock = new connectionHolder.ConnectionHolder({ log: Logger.create({}) }) result = new Result( Promise.resolve(streamObserverMock), query, @@ -406,7 +407,7 @@ describe('Result', () => { let connectionHolderMock: connectionHolder.ConnectionHolder beforeEach(() => { - connectionHolderMock = new connectionHolder.ConnectionHolder({}) + connectionHolderMock = new connectionHolder.ConnectionHolder({ log: Logger.create({}) }) result = new Result( Promise.resolve(streamObserverMock), 'query', @@ -466,14 +467,15 @@ describe('Result', () => { it.each([123, undefined])( 'should enrich summary with the protocol version onCompleted', async version => { - const connectionMock = { - protocol: () => { - return { version } - } - } + const connectionMock = new FakeConnection() + // converting to accept undefined as number + // this test is considering the situation where protocol version + // is undefined, which should not happen during normal driver + // operation. + connectionMock.protocolVersion = version as unknown as number connectionHolderMock.getConnection = async (): Promise => { - return asConnection(connectionMock) + return connectionMock } const metadata = { resultConsumedAfter: 20, @@ -631,7 +633,7 @@ describe('Result', () => { let connectionHolderMock: connectionHolder.ConnectionHolder beforeEach(() => { - connectionHolderMock = new connectionHolder.ConnectionHolder({}) + connectionHolderMock = new connectionHolder.ConnectionHolder({ log: Logger.create({}) }) result = new Result( Promise.resolve(streamObserverMock), 'query', @@ -679,14 +681,15 @@ describe('Result', () => { it.each([123, undefined])( 'should enrich summary with the protocol version on completed', async version => { - const connectionMock = { - protocol: () => { - return { version } - } - } + const connectionMock = new FakeConnection() + // converting to accept undefined as number + // this test is considering the situation where protocol version + // is undefined, which should not happen during normal driver + // operation. + connectionMock.protocolVersion = version as unknown as number connectionHolderMock.getConnection = async (): Promise => { - return await Promise.resolve(asConnection(connectionMock)) + return await Promise.resolve(connectionMock) } const metadata = { resultConsumedAfter: 20, @@ -1719,7 +1722,3 @@ function simulateStream ( } */ } - -function asConnection (value: any): Connection { - return value -} diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index a23863b59..0a9675f11 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -17,6 +17,8 @@ * limitations under the License. */ import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction, BookmarkManager, bookmarkManager, NotificationFilter, int } from '../src' +import { BeginTransactionConfig, CommitTransactionConfig } from '../src/connection' +import { Releasable } from '../src/connection-provider' import { bookmarks } from '../src/internal' import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants' import { Logger } from '../src/internal/logger' @@ -1173,36 +1175,28 @@ describe('session', () => { }) function mockBeginWithSuccess (connection: FakeConnection): FakeConnection { - const protocol = connection.protocol() - connection.protocol = () => { - return { - ...protocol, - beginTransaction: (params: { afterComplete: () => {} }, ...args: any[]) => { - protocol.beginTransaction([params, ...args]) - params.afterComplete() - } - } + const originalBegin = connection.beginTransaction.bind(connection) + connection.beginTransaction = (config: BeginTransactionConfig) => { + const stream = originalBegin(config) + config.afterComplete?.call(null, {}) + return stream } return connection } function mockCommitWithSuccess (connection: FakeConnection, metadata: any): FakeConnection { - const protocol = connection.protocol() - connection.protocol = () => { - return { - ...protocol, - commitTransaction: (params: { afterComplete: (metadata: any) => {} }, ...args: any[]) => { - const observer = protocol.commitTransaction(...[params, ...args]) - params.afterComplete(metadata) - return observer - } - } + const originalCommit = connection.commitTransaction.bind(connection) + + connection.commitTransaction = (config: CommitTransactionConfig) => { + const stream = originalCommit(config) + config.afterComplete?.call(null, metadata) + return stream } return connection } function newSessionWithConnection ( - connection: Connection, + connection: Connection & Releasable, beginTx: boolean = true, fetchSize: number = 1000, lastBookmarks: bookmarks.Bookmarks = bookmarks.Bookmarks.empty(), @@ -1224,7 +1218,7 @@ function setupSession ({ notificationFilter, auth }: { - connection: Connection + connection: Connection & Releasable beginTx?: boolean fetchSize?: number lastBookmarks?: bookmarks.Bookmarks diff --git a/packages/core/test/transaction.test.ts b/packages/core/test/transaction.test.ts index 0c8a23bda..2cbeb0115 100644 --- a/packages/core/test/transaction.test.ts +++ b/packages/core/test/transaction.test.ts @@ -18,8 +18,10 @@ */ import { ConnectionProvider, newError, NotificationFilter, Transaction, TransactionPromise } from '../src' +import { BeginTransactionConfig } from '../src/connection' import { Bookmarks } from '../src/internal/bookmarks' import { ConnectionHolder } from '../src/internal/connection-holder' +import { Logger } from '../src/internal/logger' import { TxConfig } from '../src/internal/tx-config' import FakeConnection from './utils/connection.fake' import { validNotificationFilters } from './utils/notification-filters.fixtures' @@ -137,15 +139,12 @@ testTx('TransactionPromise', newTransactionPromise, () => { function setupTx (): [TransactionPromise] { const connection = newFakeConnection() - const protocol = connection.protocol() - - connection.protocol = () => { - return { - ...protocol, - beginTransaction: (params: { afterComplete: (meta: any) => void }) => { - ctx(() => params.afterComplete({})) - } - } + const originalBegin = connection.beginTransaction.bind(connection) + + connection.beginTransaction = (config: BeginTransactionConfig) => { + const stream = originalBegin(config) + ctx(() => config.afterComplete?.call(null, {})) + return stream } const tx = newTransactionPromise({ @@ -251,16 +250,14 @@ testTx('TransactionPromise', newTransactionPromise, () => { function setupTx (): [TransactionPromise, Error] { const connection = newFakeConnection() - const protocol = connection.protocol() const expectedError = newError('begin error') - connection.protocol = () => { - return { - ...protocol, - beginTransaction: (params: { beforeError: (error: Error) => void }) => { - ctx(() => params.beforeError(expectedError)) - } - } + const originalBegin = connection.beginTransaction.bind(connection) + + connection.beginTransaction = (config: BeginTransactionConfig) => { + const stream = originalBegin(config) + ctx(() => config.beforeError?.call(null, expectedError)) + return stream } const tx = newTransactionPromise({ @@ -515,7 +512,7 @@ function newTransactionPromise ({ } connectionProvider.close = async () => await Promise.resolve() - const connectionHolder = new ConnectionHolder({ connectionProvider }) + const connectionHolder = new ConnectionHolder({ connectionProvider, log: Logger.create({}) }) connectionHolder.initializeConnection() const transaction = new TransactionPromise({ @@ -551,7 +548,7 @@ function newRegularTransaction ({ connectionProvider.acquireConnection = async () => await Promise.resolve(connection) connectionProvider.close = async () => await Promise.resolve() - const connectionHolder = new ConnectionHolder({ connectionProvider }) + const connectionHolder = new ConnectionHolder({ connectionProvider, log: Logger.create({}) }) connectionHolder.initializeConnection() const transaction = new Transaction({ diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts index 9e7d6bee9..c52035f29 100644 --- a/packages/core/test/utils/connection.fake.ts +++ b/packages/core/test/utils/connection.fake.ts @@ -18,6 +18,7 @@ */ import { Connection, ResultObserver, ResultSummary } from '../../src' +import { BeginTransactionConfig, CommitTransactionConfig, RollbackConnectionConfig, RunQueryConfig } from '../../src/connection' import { ResultStreamObserver } from '../../src/internal/observers' /** @@ -39,7 +40,7 @@ export default class FakeConnection extends Connection { public seenParameters: any[] public seenProtocolOptions: any[] private readonly _server: any - public protocolVersion: number | undefined + public protocolVersion: number public protocolErrorsHandled: number public seenProtocolErrors: string[] public seenRequestRoutingInformation: any[] @@ -62,7 +63,7 @@ export default class FakeConnection extends Connection { this.seenParameters = [] this.seenProtocolOptions = [] this._server = {} - this.protocolVersion = undefined + this.protocolVersion = 1 this.protocolErrorsHandled = 0 this.seenProtocolErrors = [] this.seenRequestRoutingInformation = [] @@ -96,44 +97,39 @@ export default class FakeConnection extends Connection { this._server.version = value } - protocol (): any { - // return fake protocol object that simply records seen queries and parameters - return { - run: (query: string, parameters: any | undefined, protocolOptions: any | undefined): ResultStreamObserver => { - this.seenQueries.push(query) - this.seenParameters.push(parameters) - this.seenProtocolOptions.push(protocolOptions) - return mockResultStreamObserver(query, parameters) - }, - commitTransaction: () => { - return mockResultStreamObserver('COMMIT', {}) - }, - beginTransaction: async (...args: any) => { - this.seenBeginTransaction.push(...args) - return await Promise.resolve() - }, - rollbackTransaction: () => { - this.rollbackInvoked++ - if (this._rollbackError !== null) { - return mockResultStreamObserverWithError('ROLLBACK', {}, this._rollbackError) - } - return mockResultStreamObserver('ROLLBACK', {}) - }, - requestRoutingInformation: (params: any | undefined) => { - this.seenRequestRoutingInformation.push(params) - if (this._requestRoutingInformationMock != null) { - this._requestRoutingInformationMock(params) - } - }, - version: this.protocolVersion + beginTransaction (config: BeginTransactionConfig): ResultStreamObserver { + this.seenBeginTransaction.push([config]) + return mockResultStreamObserver('BEGIN', {}) + } + + run (query: string, parameters?: Record | undefined, config?: RunQueryConfig | undefined): ResultStreamObserver { + this.seenQueries.push(query) + this.seenParameters.push(parameters) + this.seenProtocolOptions.push(config) + return mockResultStreamObserver(query, parameters) + } + + commitTransaction (config: CommitTransactionConfig): ResultStreamObserver { + return mockResultStreamObserver('COMMIT', {}) + } + + rollbackTransaction (config: RollbackConnectionConfig): ResultStreamObserver { + this.rollbackInvoked++ + if (this._rollbackError !== null) { + return mockResultStreamObserverWithError('ROLLBACK', {}, this._rollbackError) } + return mockResultStreamObserver('ROLLBACK', {}) + } + + getProtocolVersion (): number { + return this.protocolVersion } async resetAndFlush (): Promise { this.resetInvoked++ } - async _release (): Promise { + async release (): Promise { this.releaseInvoked++ } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js index 631889fcf..3dfa34f79 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js @@ -82,7 +82,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { */ _createConnection ({ auth }, address, release) { return this._createChannelConnection(address).then(connection => { - connection._release = () => { + connection.release = () => { return release(address, connection) } this._openConnections[connection.id] = connection @@ -160,7 +160,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { await connection.resetAndFlush() } } finally { - await connection._release() + await connection.release() } return serverInfo } @@ -191,7 +191,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { } throw error } finally { - await Promise.all(connectionsToRelease.map(conn => conn._release())) + await Promise.all(connectionsToRelease.map(conn => conn.release())) } } @@ -201,7 +201,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { connection._sticky = connectionWithSameCredentials && !connection.supportsReAuth if (shouldCreateStickyConnection || connection._sticky) { - await connection._release() + await connection.release() throw newError('Driver is connected to a database that does not support user switch.') } } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js index 4a43a612d..48ba86127 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-channel.js @@ -156,6 +156,26 @@ export default class ChannelConnection extends Connection { } } + beginTransaction (config) { + return this._protocol.beginTransaction(config) + } + + run (query, parameters, config) { + return this._protocol.run(query, parameters, config) + } + + commitTransaction (config) { + return this._protocol.commitTransaction(config) + } + + rollbackTransaction (config) { + return this._protocol.rollbackTransaction(config) + } + + getProtocolVersion () { + return this._protocol.version + } + get authToken () { return this._authToken } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-delegate.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-delegate.js index a35307e69..896a83d42 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-delegate.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection-delegate.js @@ -35,6 +35,26 @@ export default class DelegateConnection extends Connection { this._delegate = delegate } + beginTransaction (config) { + return this._delegate.beginTransaction(config) + } + + run (query, param, config) { + return this._delegate.run(query, param, config) + } + + commitTransaction (config) { + return this._delegate.commitTransaction(config) + } + + rollbackTransaction (config) { + return this._delegate.rollbackTransaction(config) + } + + getProtocolVersion () { + return this._delegate.getProtocolVersion() + } + get id () { return this._delegate.id } @@ -103,11 +123,11 @@ export default class DelegateConnection extends Connection { return this._delegate.close() } - _release () { + release () { if (this._originalErrorHandler) { this._delegate._errorHandler = this._originalErrorHandler } - return this._delegate._release() + return this._delegate.release() } } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js index d80357d09..838b1e381 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection/connection.js @@ -18,12 +18,14 @@ */ // eslint-disable-next-line no-unused-vars import { ResultStreamObserver, BoltProtocol } from '../bolt/index.js' +import { Connection as CoreConnection } from '../../core/index.ts' -export default class Connection { +export default class Connection extends CoreConnection { /** * @param {ConnectionErrorHandler} errorHandler the error handler */ constructor (errorHandler) { + super() this._errorHandler = errorHandler } @@ -51,13 +53,6 @@ export default class Connection { throw new Error('not implemented') } - /** - * @returns {boolean} whether this connection is in a working condition - */ - isOpen () { - throw new Error('not implemented') - } - /** * @returns {BoltProtocol} the underlying bolt protocol assigned to this connection */ @@ -109,18 +104,6 @@ export default class Connection { throw new Error('not implemented') } - /** - * Send a RESET-message to the database. Message is immediately flushed to the network. - * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. - */ - resetAndFlush () { - throw new Error('not implemented') - } - - hasOngoingObservableRequests () { - throw new Error('not implemented') - } - /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the connection is closed. diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js b/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js index c1789564c..2a8a0905e 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js @@ -273,53 +273,56 @@ class Pool { async _release (address, resource, pool) { const key = address.asKey() - if (pool.isActive()) { - // there exist idle connections for the given key - if (!await this._validateOnRelease(resource)) { + try { + if (pool.isActive()) { + // there exist idle connections for the given key + if (!await this._validateOnRelease(resource)) { + if (this._log.isDebugEnabled()) { + this._log.debug( + `${resource} destroyed and can't be released to the pool ${key} because it is not functional` + ) + } + pool.removeInUse(resource) + await this._destroy(resource) + } else { + if (this._installIdleObserver) { + this._installIdleObserver(resource, { + onError: error => { + this._log.debug( + `Idle connection ${resource} destroyed because of error: ${error}` + ) + const pool = this._pools[key] + if (pool) { + this._pools[key] = pool.filter(r => r !== resource) + pool.removeInUse(resource) + } + // let's not care about background clean-ups due to errors but just trigger the destroy + // process for the resource, we especially catch any errors and ignore them to avoid + // unhandled promise rejection warnings + this._destroy(resource).catch(() => {}) + } + }) + } + pool.push(resource) + if (this._log.isDebugEnabled()) { + this._log.debug(`${resource} released to the pool ${key}`) + } + } + } else { + // key has been purged, don't put it back, just destroy the resource if (this._log.isDebugEnabled()) { this._log.debug( - `${resource} destroyed and can't be released to the pool ${key} because it is not functional` + `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` ) } pool.removeInUse(resource) await this._destroy(resource) - } else { - if (this._installIdleObserver) { - this._installIdleObserver(resource, { - onError: error => { - this._log.debug( - `Idle connection ${resource} destroyed because of error: ${error}` - ) - const pool = this._pools[key] - if (pool) { - this._pools[key] = pool.filter(r => r !== resource) - pool.removeInUse(resource) - } - // let's not care about background clean-ups due to errors but just trigger the destroy - // process for the resource, we especially catch any errors and ignore them to avoid - // unhandled promise rejection warnings - this._destroy(resource).catch(() => {}) - } - }) - } - pool.push(resource) - if (this._log.isDebugEnabled()) { - this._log.debug(`${resource} released to the pool ${key}`) - } } - } else { - // key has been purged, don't put it back, just destroy the resource - if (this._log.isDebugEnabled()) { - this._log.debug( - `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` - ) - } - pool.removeInUse(resource) - await this._destroy(resource) - } - resourceReleased(key, this._activeResourceCounts) + } finally { + resourceReleased(key, this._activeResourceCounts) - this._processPendingAcquireRequests(address) + this._processPendingAcquireRequests(address) + } } async _purgeKey (key) { diff --git a/packages/neo4j-driver-deno/lib/core/connection-provider.ts b/packages/neo4j-driver-deno/lib/core/connection-provider.ts index 5de1b320d..8ec5d162d 100644 --- a/packages/neo4j-driver-deno/lib/core/connection-provider.ts +++ b/packages/neo4j-driver-deno/lib/core/connection-provider.ts @@ -23,6 +23,18 @@ import { bookmarks } from './internal/index.ts' import { ServerInfo } from './result-summary.ts' import { AuthToken } from './types.ts' +/** + * Interface define a releasable resource shape + * + * @private + * @interface + */ +class Releasable { + release (): Promise { + throw new Error('Not implemented') + } +} + /** * Interface define a common way to acquire a connection * @@ -53,7 +65,7 @@ class ConnectionProvider { impersonatedUser?: string onDatabaseNameResolved?: (databaseName?: string) => void auth?: AuthToken - }): Promise { + }): Promise { throw Error('Not implemented') } @@ -150,3 +162,6 @@ class ConnectionProvider { } export default ConnectionProvider +export { + Releasable +} diff --git a/packages/neo4j-driver-deno/lib/core/connection.ts b/packages/neo4j-driver-deno/lib/core/connection.ts index 9ac5950ce..cf9cb9eca 100644 --- a/packages/neo4j-driver-deno/lib/core/connection.ts +++ b/packages/neo4j-driver-deno/lib/core/connection.ts @@ -18,121 +18,91 @@ */ /* eslint-disable @typescript-eslint/promise-function-async */ -import { ServerAddress } from './internal/server-address.ts' - -/** - * Interface which defines the raw connection with the database - * @private - */ -class Connection { - get id (): string { - return '' - } +import { Bookmarks } from './internal/bookmarks.ts' +import { AccessMode } from './internal/constants.ts' +import { ResultStreamObserver } from './internal/observers.ts' +import { TxConfig } from './internal/tx-config.ts' +import NotificationFilter from './notification-filter.ts' + +interface HasBeforeErrorAndAfterComplete { + beforeError?: (error: Error) => void + afterComplete?: (metadata: unknown) => void +} - get databaseId (): string { - return '' - } +interface BeginTransactionConfig extends HasBeforeErrorAndAfterComplete { + bookmarks: Bookmarks + txConfig: TxConfig + mode?: AccessMode + database?: string + impersonatedUser?: string + notificationFilter?: NotificationFilter +} - get server (): any { - return {} - } +interface CommitTransactionConfig extends HasBeforeErrorAndAfterComplete { - /** - * @property {object} authToken The auth registered in the connection - */ - get authToken (): any { - return {} - } +} - /** - * @property {ServerAddress} the server address this connection is opened against - */ - get address (): ServerAddress | undefined { - return undefined - } +interface RollbackConnectionConfig extends HasBeforeErrorAndAfterComplete { - /** - * @property {ServerVersion} the version of the server this connection is connected to - */ - get version (): any { - return undefined - } +} - /** - * @property {boolean} supportsReAuth Indicates the connection supports re-auth - */ - get supportsReAuth (): boolean { - return false - } +interface RunQueryConfig extends BeginTransactionConfig { + fetchSize: number + highRecordWatermark: number + lowRecordWatermark: number + reactive: boolean +} - /** - * @returns {boolean} whether this connection is in a working condition - */ - isOpen (): boolean { - return false +/** + * Interface which defines a connection for the core driver object. + * + * + * This connection exposes only methods used by the code module. + * Methods with connection implementation details can be defined and used + * by the implementation layer. + * + * @private + * @interface + */ +class Connection { + beginTransaction (config: BeginTransactionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * @todo be removed and internalize the methods - * @returns {any} the underlying bolt protocol assigned to this connection - */ - protocol (): any { - throw Error('Not implemented') + run (query: string, parameters?: Record, config?: RunQueryConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Connect to the target address, negotiate Bolt protocol and send initialization message. - * @param {string} userAgent the user agent for this driver. - * @param {string} boltAgent the bolt agent for this driver. - * @param {Object} authToken the object containing auth information. - * @param {Object} waitReAuth whether to connect method should wait until re-Authorised - * @return {Promise} promise resolved with the current connection if connection is successful. Rejected promise otherwise. - */ - connect (userAgent: string, boltAgent: string, authToken: any, waitReAuth: false): Promise { - throw Error('Not implemented') + commitTransaction (config: CommitTransactionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Write a message to the network channel. - * @param {RequestMessage} message the message to write. - * @param {ResultStreamObserver} observer the response observer. - * @param {boolean} flush `true` if flush should happen after the message is written to the buffer. - */ - write (message: any, observer: any, flush: boolean): void { - throw Error('Not implemented') + rollbackTransaction (config: RollbackConnectionConfig): ResultStreamObserver { + throw new Error('Not implemented') } - /** - * Send a RESET-message to the database. Message is immediately flushed to the network. - * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. - */ resetAndFlush (): Promise { - throw Error('Not implemented') + throw new Error('Not implemented') } - /** - * Checks if there is an ongoing request being handled - * @return {boolean} `true` if there is an ongoing request being handled - */ - hasOngoingObservableRequests (): boolean { - throw Error('Not implemented') + isOpen (): boolean { + throw new Error('Not implemented') } - /** - * Call close on the channel. - * @returns {Promise} - A promise that will be resolved when the connection is closed. - * - */ - close (): Promise { - throw Error('Not implemented') + getProtocolVersion (): number { + throw new Error('Not implemented') } - /** - * Called to release the connection - */ - _release (): Promise { - return Promise.resolve() + hasOngoingObservableRequests (): boolean { + throw new Error('Not implemented') } } export default Connection + +export type { + BeginTransactionConfig, + CommitTransactionConfig, + RollbackConnectionConfig, + RunQueryConfig +} diff --git a/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts b/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts index 4d43ed1cc..db93a0494 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/connection-holder.ts @@ -21,10 +21,11 @@ import { newError } from '../error.ts' import { assertString } from './util.ts' import Connection from '../connection.ts' -import { ACCESS_MODE_WRITE } from './constants.ts' +import { ACCESS_MODE_WRITE, AccessMode } from './constants.ts' import { Bookmarks } from './bookmarks.ts' -import ConnectionProvider from '../connection-provider.ts' +import ConnectionProvider, { Releasable } from '../connection-provider.ts' import { AuthToken } from '../types.ts' +import { Logger } from './logger.ts' /** * @private @@ -77,16 +78,17 @@ interface ConnectionHolderInterface { * @private */ class ConnectionHolder implements ConnectionHolderInterface { - private readonly _mode: string + private readonly _mode: AccessMode private _database?: string private readonly _bookmarks: Bookmarks private readonly _connectionProvider?: ConnectionProvider private _referenceCount: number - private _connectionPromise: Promise + private _connectionPromise: Promise private readonly _impersonatedUser?: string private readonly _getConnectionAcquistionBookmarks: () => Promise private readonly _onDatabaseNameResolved?: (databaseName?: string) => void private readonly _auth?: AuthToken + private readonly _log: Logger private _closed: boolean /** @@ -102,16 +104,17 @@ class ConnectionHolder implements ConnectionHolderInterface { * @property {AuthToken} params.auth - the target auth for the to-be-acquired connection */ constructor ({ - mode = ACCESS_MODE_WRITE, + mode, database = '', bookmarks, connectionProvider, impersonatedUser, onDatabaseNameResolved, getConnectionAcquistionBookmarks, - auth + auth, + log }: { - mode?: string + mode?: AccessMode database?: string bookmarks?: Bookmarks connectionProvider?: ConnectionProvider @@ -119,8 +122,9 @@ class ConnectionHolder implements ConnectionHolderInterface { onDatabaseNameResolved?: (databaseName?: string) => void getConnectionAcquistionBookmarks?: () => Promise auth?: AuthToken - } = {}) { - this._mode = mode + log: Logger + }) { + this._mode = mode ?? ACCESS_MODE_WRITE this._closed = false this._database = database != null ? assertString(database, 'database') : '' this._bookmarks = bookmarks ?? Bookmarks.empty() @@ -130,10 +134,12 @@ class ConnectionHolder implements ConnectionHolderInterface { this._connectionPromise = Promise.resolve(null) this._onDatabaseNameResolved = onDatabaseNameResolved this._auth = auth + this._log = log + this._logError = this._logError.bind(this) this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty())) } - mode (): string | undefined { + mode (): AccessMode | undefined { return this._mode } @@ -168,7 +174,7 @@ class ConnectionHolder implements ConnectionHolderInterface { return true } - private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { + private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise { return await connectionProvider.acquireConnection({ accessMode: this._mode, database: this._database, @@ -209,6 +215,10 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._releaseConnection(hasTx) } + log (): Logger { + return this._log + } + /** * Return the current pooled connection instance to the connection pool. * We don't pool Session instances, to avoid users using the Session after they've called close. @@ -218,23 +228,32 @@ class ConnectionHolder implements ConnectionHolderInterface { */ private _releaseConnection (hasTx?: boolean): Promise { this._connectionPromise = this._connectionPromise - .then((connection?: Connection | null) => { + .then((connection?: Connection & Releasable | null) => { if (connection != null) { if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx === true)) { return connection .resetAndFlush() .catch(ignoreError) - .then(() => connection._release().then(() => null)) + .then(() => connection.release().then(() => null)) } - return connection._release().then(() => null) + return connection.release().then(() => null) } else { return Promise.resolve(null) } }) - .catch(ignoreError) + .catch(this._logError) return this._connectionPromise } + + _logError (error: Error): null { + if (this._log.isWarnEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this._log.warn(`ConnectionHolder got an error while releasing the connection. Error ${error}. Stacktrace: ${error.stack}`) + } + + return null + } } /** @@ -245,7 +264,7 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { private readonly _connectionHolder: ConnectionHolder /** - * Contructor + * Constructor * @param {ConnectionHolder} connectionHolder the connection holder which will treat the requests */ constructor (connectionHolder: ConnectionHolder) { @@ -255,7 +274,8 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { bookmarks: connectionHolder.bookmarks(), // @ts-expect-error getConnectionAcquistionBookmarks: connectionHolder._getConnectionAcquistionBookmarks, - connectionProvider: connectionHolder.connectionProvider() + connectionProvider: connectionHolder.connectionProvider(), + log: connectionHolder.log() }) this._connectionHolder = connectionHolder } @@ -298,6 +318,13 @@ export default class ReadOnlyConnectionHolder extends ConnectionHolder { } class EmptyConnectionHolder extends ConnectionHolder { + constructor () { + super({ + // Empty logger + log: Logger.create({}) + }) + } + mode (): undefined { return undefined } diff --git a/packages/neo4j-driver-deno/lib/core/internal/constants.ts b/packages/neo4j-driver-deno/lib/core/internal/constants.ts index 2c56e9f1b..f4cf14d9d 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/constants.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/constants.ts @@ -38,6 +38,8 @@ const BOLT_PROTOCOL_V5_1: number = 5.1 const BOLT_PROTOCOL_V5_2: number = 5.2 const BOLT_PROTOCOL_V5_3: number = 5.3 +export type AccessMode = typeof ACCESS_MODE_READ | typeof ACCESS_MODE_WRITE + export { FETCH_ALL, ACCESS_MODE_READ, diff --git a/packages/neo4j-driver-deno/lib/core/result.ts b/packages/neo4j-driver-deno/lib/core/result.ts index 71dd5e92f..f000b6ac1 100644 --- a/packages/neo4j-driver-deno/lib/core/result.ts +++ b/packages/neo4j-driver-deno/lib/core/result.ts @@ -513,7 +513,7 @@ class Result implements Promise - connection?.protocol()?.version + connection?.getProtocolVersion() ), // onRejected: _ => undefined diff --git a/packages/neo4j-driver-deno/lib/core/session.ts b/packages/neo4j-driver-deno/lib/core/session.ts index 1dc41befa..08e1daf85 100644 --- a/packages/neo4j-driver-deno/lib/core/session.ts +++ b/packages/neo4j-driver-deno/lib/core/session.ts @@ -19,7 +19,7 @@ /* eslint-disable @typescript-eslint/promise-function-async */ -import { FailedObserver } from './internal/observers.ts' +import { FailedObserver, ResultStreamObserver } from './internal/observers.ts' import { validateQueryAndParameters } from './internal/util.ts' import { FETCH_ALL, ACCESS_MODE_READ, ACCESS_MODE_WRITE } from './internal/constants.ts' import { newError } from './error.ts' @@ -40,7 +40,7 @@ import { RecordShape } from './record.ts' import NotificationFilter from './notification-filter.ts' import { Logger } from './internal/logger.ts' -type ConnectionConsumer = (connection: Connection | null) => any | undefined | Promise | Promise +type ConnectionConsumer = (connection: Connection) => Promise | T type TransactionWork = (tx: Transaction) => Promise | T type ManagedTransactionWork = (tx: ManagedTransaction) => Promise | T @@ -75,7 +75,7 @@ class Session { private readonly _results: Result[] private readonly _bookmarkManager?: BookmarkManager private readonly _notificationFilter?: NotificationFilter - private readonly _log?: Logger + private readonly _log: Logger /** * @constructor * @protected @@ -132,7 +132,8 @@ class Session { connectionProvider, impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, + log }) this._writeConnectionHolder = new ConnectionHolder({ mode: ACCESS_MODE_WRITE, @@ -142,7 +143,8 @@ class Session { connectionProvider, impersonatedUser, onDatabaseNameResolved: this._onDatabaseNameResolved, - getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks + getConnectionAcquistionBookmarks: this._getConnectionAcquistionBookmarks, + log }) this._open = true this._hasTx = false @@ -187,7 +189,7 @@ class Session { const result = this._run(validatedQuery, params, async connection => { const bookmarks = await this._bookmarks() this._assertSessionIsOpen() - return (connection as Connection).protocol().run(validatedQuery, params, { + return connection.run(validatedQuery, params, { bookmarks, txConfig: autoCommitTxConfig, mode: this._mode, @@ -205,57 +207,61 @@ class Session { return result } - _run ( + _run ( query: Query, parameters: any, - customRunner: ConnectionConsumer + customRunner: ConnectionConsumer ): Result { - const connectionHolder = this._connectionHolderWithMode(this._mode) - - let observerPromise - if (!this._open) { - observerPromise = Promise.resolve( - new FailedObserver({ - error: newError('Cannot run query in a closed session.') - }) - ) - } else if (!this._hasTx && connectionHolder.initializeConnection()) { - observerPromise = connectionHolder - .getConnection() - .then(connection => customRunner(connection)) - .catch(error => Promise.resolve(new FailedObserver({ error }))) - } else { - observerPromise = Promise.resolve( - new FailedObserver({ - error: newError( - 'Queries cannot be run directly on a ' + - 'session with an open transaction; either run from within the ' + - 'transaction or use a different session.' - ) - }) - ) - } + const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(customRunner) + const observerPromise = resultPromise.catch(error => Promise.resolve(new FailedObserver({ error }))) const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark } return new Result(observerPromise, query, parameters, connectionHolder, watermarks) } - _acquireConnection (connectionConsumer: ConnectionConsumer): Promise { - let promise + /** + * This method is used by Rediscovery on the neo4j-driver-bolt-protocol package. + * + * @private + * @param {function()} connectionConsumer The method which will use the connection + * @returns {Promise} A connection promise + */ + _acquireConnection (connectionConsumer: ConnectionConsumer): Promise { + const { connectionHolder, resultPromise } = this._acquireAndConsumeConnection(connectionConsumer) + + return resultPromise.then(async (result: T) => { + await connectionHolder.releaseConnection() + return result + }) + } + + /** + * Acquires a {@link Connection}, consume it and return a promise of the result along with + * the {@link ConnectionHolder} used in the process. + * + * @private + * @param connectionConsumer + * @returns {object} The connection holder and connection promise. + */ + + private _acquireAndConsumeConnection(connectionConsumer: ConnectionConsumer): { + connectionHolder: ConnectionHolder + resultPromise: Promise + } { + let resultPromise: Promise const connectionHolder = this._connectionHolderWithMode(this._mode) if (!this._open) { - promise = Promise.reject( + resultPromise = Promise.reject( newError('Cannot run query in a closed session.') ) } else if (!this._hasTx && connectionHolder.initializeConnection()) { - promise = connectionHolder + resultPromise = connectionHolder .getConnection() - .then(connection => connectionConsumer(connection)) - .then(async result => { - await connectionHolder.releaseConnection() - return result - }) + // Connection won't be null at this point since the initialize method + // return + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .then(connection => connectionConsumer(connection!)) } else { - promise = Promise.reject( + resultPromise = Promise.reject( newError( 'Queries cannot be run directly on a ' + 'session with an open transaction; either run from within the ' + @@ -264,7 +270,7 @@ class Session { ) } - return promise + return { connectionHolder, resultPromise } } /** diff --git a/packages/neo4j-driver-deno/lib/core/transaction.ts b/packages/neo4j-driver-deno/lib/core/transaction.ts index dc8a0cf1d..703577a97 100644 --- a/packages/neo4j-driver-deno/lib/core/transaction.ts +++ b/packages/neo4j-driver-deno/lib/core/transaction.ts @@ -139,7 +139,7 @@ class Transaction { this._onConnection() if (connection != null) { this._bookmarks = await getBookmarks() - return connection.protocol().beginTransaction({ + return connection.beginTransaction({ bookmarks: this._bookmarks, txConfig, mode: this._connectionHolder.mode(), @@ -150,13 +150,13 @@ class Transaction { if (events != null) { events.onError(error) } - return this._onError(error) + this._onError(error).catch(() => {}) }, afterComplete: (metadata: any) => { if (events != null) { events.onComplete(metadata) } - return this._onComplete(metadata) + this._onComplete(metadata) } }) } else { @@ -364,7 +364,7 @@ const _states = { } }, run: ( - query: Query, + query: string, parameters: any, { connectionHolder, @@ -388,7 +388,7 @@ const _states = { .then(conn => { onConnection() if (conn != null) { - return conn.protocol().run(query, parameters, { + return conn.run(query, parameters, { bookmarks: Bookmarks.empty(), txConfig: TxConfig.empty(), beforeError: onError, @@ -643,12 +643,12 @@ function finishTransaction ( return Promise.all(pendingResults.map(result => result.summary())).then(results => { if (connection != null) { if (commit) { - return connection.protocol().commitTransaction({ + return connection.commitTransaction({ beforeError: onError, afterComplete: onComplete }) } else { - return connection.protocol().rollbackTransaction({ + return connection.rollbackTransaction({ beforeError: onError, afterComplete: onComplete }) diff --git a/packages/neo4j-driver/test/internal/connection-holder-readonly.test.js b/packages/neo4j-driver/test/internal/connection-holder-readonly.test.js index 9484831ec..ffd25d088 100644 --- a/packages/neo4j-driver/test/internal/connection-holder-readonly.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder-readonly.test.js @@ -299,7 +299,7 @@ describe('#unit ReadOnlyConnectionHolder wrapping ConnectionHolder', () => { }) function newConnectionHolder (params, connectionHolderInit = () => {}) { - const connectionHolder = new ConnectionHolder(params) + const connectionHolder = new ConnectionHolder(params || {}) connectionHolderInit(connectionHolder) return new ReadOnlyConnectionHolder(connectionHolder) } diff --git a/packages/neo4j-driver/test/internal/connection-holder.test.js b/packages/neo4j-driver/test/internal/connection-holder.test.js index 5ba4a4dd4..bd9986c5d 100644 --- a/packages/neo4j-driver/test/internal/connection-holder.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder.test.js @@ -23,7 +23,8 @@ import FakeConnection from './fake-connection' import { internal } from 'neo4j-driver-core' const { - connectionHolder: { ConnectionHolder, EMPTY_CONNECTION_HOLDER } + connectionHolder: { ConnectionHolder, EMPTY_CONNECTION_HOLDER }, + logger: { Logger } } = internal describe('#unit EmptyConnectionHolder', () => { @@ -254,7 +255,7 @@ describe('#unit ConnectionHolder', () => { expect(connectionProvider.mode()).toBe(mode) } - verifyMode(new ConnectionHolder(), WRITE) + verifyMode(new ConnectionHolder({}), WRITE) verifyMode(new ConnectionHolder({ mode: WRITE }), WRITE) verifyMode(new ConnectionHolder({ mode: READ }), READ) }) @@ -266,7 +267,7 @@ describe('#unit ConnectionHolder', () => { const connectionProvider = newSingleConnectionProvider(new FakeConnection()) - verifyDefault(new ConnectionHolder()) + verifyDefault(new ConnectionHolder({})) verifyDefault(new ConnectionHolder({ mode: READ, connectionProvider })) verifyDefault(new ConnectionHolder({ mode: WRITE, connectionProvider })) verifyDefault( @@ -316,11 +317,83 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(1) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) + describe('and connection is open but release fails', () => { + describe('when warn logging is enabled', () => { + let connection + let releaseError + let log + let warnSpy + + beforeEach(async () => { + log = new Logger('warn', () => {}) + warnSpy = spyOn(log, 'warn').and.callThrough() + + releaseError = new Error('something wrong is not right') + connection = new FakeConnection() + const originalRelease = connection.release.bind(connection) + connection.release = () => { + originalRelease() + return Promise.reject(releaseError) + } + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider, + log + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should log error as warning()', () => { + expect(warnSpy).toHaveBeenCalledWith(jasmine.stringMatching( + `ConnectionHolder got an error while releasing the connection. Error ${releaseError}. Stacktrace:` + )) + }) + }) + + describe('when warn logging is not enabled', () => { + let connection + let releaseError + let log + let warnSpy + + beforeEach(async () => { + log = new Logger('error', () => {}) + warnSpy = spyOn(log, 'warn').and.callThrough() + + releaseError = new Error('something wrong is not right') + connection = new FakeConnection() + const originalRelease = connection.release.bind(connection) + connection.release = () => { + originalRelease() + return Promise.reject(releaseError) + } + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider, + log + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should not log error', () => { + expect(warnSpy).not.toHaveBeenCalled() + }) + }) + }) + describe('and has not ongoing requests', () => { let connection @@ -343,7 +416,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(0) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) @@ -369,7 +442,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(0) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) @@ -398,7 +471,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(1) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) @@ -424,7 +497,7 @@ describe('#unit ConnectionHolder', () => { expect(connection.resetInvoked).toBe(0) }) - it('should call connection._release()', () => { + it('should call connection.release()', () => { expect(connection.releaseInvoked).toBe(1) }) }) diff --git a/packages/neo4j-driver/test/internal/fake-connection.js b/packages/neo4j-driver/test/internal/fake-connection.js index 75b9497a8..2d390d5a7 100644 --- a/packages/neo4j-driver/test/internal/fake-connection.js +++ b/packages/neo4j-driver/test/internal/fake-connection.js @@ -110,7 +110,7 @@ export default class FakeConnection extends Connection { return Promise.resolve() } - _release () { + release () { this.releaseInvoked++ return Promise.resolve() }