diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js index a2648443e..55eec22e8 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js @@ -79,6 +79,7 @@ export default class BoltProtocol { this._log = log this._onProtocolError = onProtocolError this._fatalError = null + this._lastMessageSignature = null } /** @@ -356,6 +357,8 @@ export default class BoltProtocol { this._log.debug(`C: ${message}`) } + this._lastMessageSignature = message.signature + this.packer().packStruct( message.signature, message.fields.map(field => this.packer().packable(field)) @@ -369,6 +372,14 @@ export default class BoltProtocol { } } + isLastMessageLogin () { + return this._lastMessageSignature === 0x01 + } + + isLastMessageReset () { + return this._lastMessageSignature === 0x0f + } + /** * Notifies faltal erros to the observers and mark the protocol in the fatal error state. * @param {Error} error The error diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index 6071f89c1..42cb30fbc 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -71,7 +71,9 @@ export default class PooledConnectionProvider extends ConnectionProvider { */ _createConnection (address, release) { return this._createChannelConnection(address).then(connection => { - connection._release = () => release(address, connection) + connection._release = () => { + return release(address, connection) + } this._openConnections[connection.id] = connection return connection .connect(this._userAgent, this._authToken) @@ -119,7 +121,9 @@ export default class PooledConnectionProvider extends ConnectionProvider { const connection = await this._connectionPool.acquire(address) const serverInfo = new ServerInfo(connection.server, connection.protocol().version) try { - await connection.resetAndFlush() + if (!connection.protocol().isLastMessageLogin()) { + await connection.resetAndFlush() + } } finally { await connection._release() } diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index d1325b5e1..4efcb5f51 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -123,6 +123,8 @@ export default class ChannelConnection extends Connection { ) { super(errorHandler) + this._reseting = false + this._resetObservers = [] this._id = idGenerator++ this._address = address this._server = { address: address.asHostPort() } @@ -304,7 +306,7 @@ export default class ChannelConnection extends Connection { */ resetAndFlush () { return new Promise((resolve, reject) => { - this._protocol.reset({ + this._reset({ onError: error => { if (this._isBroken) { // handling a fatal error, no need to raise a protocol violation @@ -328,7 +330,7 @@ export default class ChannelConnection extends Connection { return } - this._protocol.reset({ + this._reset({ onError: () => { this._protocol.resetFailure() }, @@ -338,6 +340,41 @@ export default class ChannelConnection extends Connection { }) } + _reset(observer) { + if (this._reseting) { + if (!this._protocol.isLastMessageReset()) { + this._protocol.reset({ + onError: error => { + observer.onError(error) + }, onComplete: () => { + observer.onComplete() + } + }) + } else { + this._resetObservers.push(observer) + } + return + } + + this._resetObservers.push(observer) + this._reseting = true + + const notifyFinish = (notify) => { + this._reseting = false + const observers = this._resetObservers + this._resetObservers = [] + observers.forEach(notify) + } + + this._protocol.reset({ + onError: error => { + notifyFinish(obs => obs.onError(error)) + }, onComplete: () => { + notifyFinish(obs => obs.onComplete()) + } + }) + } + /* * Pop next pending observer form the list of observers and make it current observer. * @protected diff --git a/packages/bolt-connection/src/connection/connection-delegate.js b/packages/bolt-connection/src/connection/connection-delegate.js index 14c82a4c5..16d2da48a 100644 --- a/packages/bolt-connection/src/connection/connection-delegate.js +++ b/packages/bolt-connection/src/connection/connection-delegate.js @@ -83,6 +83,10 @@ export default class DelegateConnection extends Connection { return this._delegate.resetAndFlush() } + hasOngoingObservableRequests () { + return this._delegate.hasOngoingObservableRequests() + } + close () { return this._delegate.close() } diff --git a/packages/bolt-connection/src/connection/connection.js b/packages/bolt-connection/src/connection/connection.js index 21cdd3c1a..d3c692712 100644 --- a/packages/bolt-connection/src/connection/connection.js +++ b/packages/bolt-connection/src/connection/connection.js @@ -103,6 +103,10 @@ export default class Connection { throw new Error('not implemented') } + hasOngoingObservableRequests () { + throw new Error('not implemented') + } + /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the connection is closed. diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index 6abc03082..e1ec76394 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -251,7 +251,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { const create = (address, release) => { const connection = new FakeConnection(address, release, server) connection.protocol = () => { - return { version: protocolVersion } + return { version: protocolVersion, isLastMessageLogin() { return false } } } connection.resetAndFlush = resetAndFlush if (releaseMock) { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index e1b297229..e765f342f 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -2584,6 +2584,26 @@ describe('#unit RoutingConnectionProvider', () => { .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) }) + it('should not call resetAndFlush for newly created connections', async () => { + const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ newConnection: true }) + const acquireSpy = jest.spyOn(pool, 'acquire') + + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers + const address = targetServers[0] + expect(acquireSpy).toHaveBeenCalledWith(address) + + const connections = seenConnectionsPerAddress.get(address) + + // verifying resetAndFlush was not called + expect(connections[0].resetAndFlush).not.toHaveBeenCalled() + + // extra checks + expect(connections.length).toBe(1) + expect(connections[0]._release).toHaveBeenCalled() + }) + it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => { const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup() const acquireSpy = jest.spyOn(pool, 'acquire') @@ -2756,7 +2776,7 @@ describe('#unit RoutingConnectionProvider', () => { }) }) - function setup ({ resetAndFlush, releaseMock } = {}) { + function setup ({ resetAndFlush, releaseMock, newConnection } = { }) { const routingTable = newRoutingTable( database || null, [server1, server2], @@ -2774,6 +2794,7 @@ describe('#unit RoutingConnectionProvider', () => { seenConnectionsPerAddress.set(address, []) } const connection = new FakeConnection(address, release, 'version', protocolVersion, server) + connection._firstUsage = !!newConnection if (resetAndFlush) { connection.resetAndFlush = resetAndFlush } @@ -3082,7 +3103,8 @@ class FakeConnection extends Connection { protocol () { return { - version: this._protocolVersion + version: this._protocolVersion, + isLastMessageLogin: () => this._firstUsage } } } diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index c1acd5ce8..21696bd27 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -260,6 +260,76 @@ describe('ChannelConnection', () => { expect(protocol.reset).toHaveBeenCalled() expect(protocol.resetFailure).toHaveBeenCalled() }) + + it('should not call protocol.reset() when there is an ongoing reset', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(), + resetFailure: jest.fn(), + isLastMessageReset: jest.fn(() => true) + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).not.toHaveBeenCalled() + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).not.toHaveBeenCalled() + }) + + it('should call protocol.reset() when after a previous reset completed', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onComplete()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).toHaveBeenCalledTimes(1) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(2) + expect(protocol.resetFailure).toHaveBeenCalledTimes(2) + }) + + it('should call protocol.reset() when after a previous reset fail', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onError(new Error('some error'))), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).toHaveBeenCalledTimes(1) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(2) + expect(protocol.resetFailure).toHaveBeenCalledTimes(2) + }) }) describe('when connection is not open', () => { @@ -340,6 +410,110 @@ describe('ChannelConnection', () => { }) }) + describe('.resetAndFlush()', () => { + it('should call protocol.reset() onComplete', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onComplete()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + await connection.resetAndFlush().catch(() => {}) + + expect(protocol.reset).toHaveBeenCalled() + }) + + it('should call protocol.reset() onError', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onError()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + await connection.resetAndFlush().catch(() => {}) + + expect(protocol.reset).toHaveBeenCalled() + }) + + it('should not call protocol.reset() when there is an ongoing reset', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => { + setTimeout(() => observer.onComplete(), 100) + }), + resetFailure: jest.fn(), + isLastMessageReset: jest.fn(() => true) + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + const completeFirstResetAndFlush = connection.resetAndFlush() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + + await connection.resetAndFlush() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + + await completeFirstResetAndFlush + }) + + it('should call protocol.reset() when after a previous reset completed', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onComplete()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + await connection.resetAndFlush() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + + await connection.resetAndFlush() + + expect(protocol.reset).toHaveBeenCalledTimes(2) + }) + + it('should call protocol.reset() when after a previous reset fail', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onError(new Error('some error'))), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + await connection.resetAndFlush().catch(() => {}) + + expect(protocol.reset).toHaveBeenCalledTimes(1) + + await connection.resetAndFlush().catch(() => {}) + + expect(protocol.reset).toHaveBeenCalledTimes(2) + }) + }) + function spyOnConnectionChannel ({ channel, errorHandler, diff --git a/packages/core/src/connection.ts b/packages/core/src/connection.ts index 54e81276e..6761adc58 100644 --- a/packages/core/src/connection.ts +++ b/packages/core/src/connection.ts @@ -94,6 +94,14 @@ class Connection { throw Error('Not implemented') } + /** + * Checks if there is an ongoing request being handled + * @return {boolean} `true` if there is an ongoing request being handled + */ + hasOngoingObservableRequests (): boolean { + throw Error('Not implemented') + } + /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the connection is closed. diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 57217cf58..6b608fedf 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -178,12 +178,12 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._connectionPromise } - close (): Promise { + close (hasTx?: boolean): Promise { if (this._referenceCount === 0) { return this._connectionPromise } this._referenceCount = 0 - return this._releaseConnection() + return this._releaseConnection(hasTx) } /** @@ -193,11 +193,11 @@ class ConnectionHolder implements ConnectionHolderInterface { * @return {Promise} - promise resolved then connection is returned to the pool. * @private */ - private _releaseConnection (): Promise { + private _releaseConnection (hasTx?: boolean): Promise { this._connectionPromise = this._connectionPromise .then((connection?: Connection|null) => { if (connection != null) { - if (connection.isOpen()) { + if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx === true)) { return connection .resetAndFlush() .catch(ignoreError) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index e0e0bee59..f785c120a 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -494,8 +494,8 @@ class Session { this._transactionExecutor.close() - await this._readConnectionHolder.close() - await this._writeConnectionHolder.close() + await this._readConnectionHolder.close(this._hasTx) + await this._writeConnectionHolder.close(this._hasTx) } } diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index 19cbc9cd3..6a561be52 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -186,6 +186,38 @@ describe('session', () => { }).catch(done) }, 70000) + it('close should reset connection if there is an ongoing request ', async () => { + const connection = newFakeConnection() + const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush') + const session = newSessionWithConnection(connection) + + await session.close() + + expect(resetAndFlushSpy).toHaveBeenCalledTimes(1) + }, 70000) + + it('close should not reset connection if there is not an ongoing request', async () => { + const connection = newFakeConnection() + connection.hasOngoingObservableRequests = () => false + const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush') + const session = newSessionWithConnection(connection, false) + + await session.close() + + expect(resetAndFlushSpy).not.toHaveBeenCalled() + }, 70000) + + it('close should reset connection if there is not an ongoing request but it has tx running', async () => { + const connection = newFakeConnection() + connection.hasOngoingObservableRequests = () => false + const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush') + const session = newSessionWithConnection(connection) + + await session.close() + + expect(resetAndFlushSpy).toHaveBeenCalled() + }, 70000) + it('should close transaction executor', done => { const session = newSessionWithConnection(newFakeConnection()) diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts index fce1c493c..f1157cede 100644 --- a/packages/core/test/utils/connection.fake.ts +++ b/packages/core/test/utils/connection.fake.ts @@ -179,6 +179,10 @@ export default class FakeConnection extends Connection { this._open = false return this } + + hasOngoingObservableRequests(): boolean { + return true + } } function mockResultStreamObserverWithError (query: string, parameters: any | undefined, error: Error): ResultStreamObserver { diff --git a/packages/neo4j-driver/test/internal/connection-holder.test.js b/packages/neo4j-driver/test/internal/connection-holder.test.js index db82ee64c..8adc4f0df 100644 --- a/packages/neo4j-driver/test/internal/connection-holder.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder.test.js @@ -319,6 +319,33 @@ describe('#unit ConnectionHolder', () => { }) }) + describe('and has not ongoing requests', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection().withHasOngoingObservableRequests( + false + ) + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(0) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + describe('and connection is not open', () => { let connection diff --git a/packages/neo4j-driver/test/internal/fake-connection.js b/packages/neo4j-driver/test/internal/fake-connection.js index 07f90910f..fff4d6d4c 100644 --- a/packages/neo4j-driver/test/internal/fake-connection.js +++ b/packages/neo4j-driver/test/internal/fake-connection.js @@ -52,6 +52,7 @@ export default class FakeConnection extends Connection { this.protocolErrorsHandled = 0 this.seenProtocolErrors = [] this.seenRequestRoutingInformation = [] + this._hasOnGoingObservableRequests = true } get id () { @@ -161,6 +162,15 @@ export default class FakeConnection extends Connection { return this } + withHasOngoingObservableRequests (value) { + this._hasOnGoingObservableRequests = value + return this + } + + hasOngoingObservableRequests () { + return this._hasOnGoingObservableRequests + } + closed () { this._open = false return this diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index 45e005157..2dc79ad92 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -33,6 +33,7 @@ const features = [ 'Feature:API:Driver.VerifyConnectivity', 'Feature:API:Result.Peek', 'Optimization:ImplicitDefaultArguments', + 'Optimization:MinimalResets', ...SUPPORTED_TLS ] diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 5903a9893..aecdfdace 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -335,7 +335,7 @@ export function SessionWriteTransaction (context, data, wire) { } export function StartTest (context, { testName }, wire) { - if (testName.endsWith('.test_disconnect_session_on_tx_pull_after_record')) { + if (testName.endsWith('.test_disconnect_session_on_tx_pull_after_record') || testName.endsWith('test_no_reset_on_clean_connection')) { context.logLevel = 'debug' } else { context.logLevel = null