From 956d6cd8f17a4b36bddbad3e10b9eca545fabced Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 16 Mar 2022 17:48:23 +0100 Subject: [PATCH 01/12] Avoid to send reset when it is not needed The `RESET` should be send when a failure occurs or whenever the connection is being sent back to the pool with a pending request running, i.e. the bolt server is not in the `READY` state. These changes also affect the `verifyConnectivity` and `getServerInfo` implementation. The `RESET` message is not sent in these methods if it is a newly created connection. --- .../connection-provider-pooled.js | 10 +- .../src/connection/connection-channel.js | 22 ++- .../src/connection/connection-delegate.js | 4 + .../src/connection/connection.js | 4 + .../connection-provider-routing.test.js | 23 ++- .../connection/connection-channel.test.js | 170 ++++++++++++++++++ packages/core/src/connection.ts | 8 + .../core/src/internal/connection-holder.ts | 2 +- packages/core/test/session.test.ts | 21 +++ packages/core/test/utils/connection.fake.ts | 4 + .../test/internal/connection-holder.test.js | 27 +++ .../test/internal/fake-connection.js | 10 ++ .../testkit-backend/src/feature/common.js | 1 + 13 files changed, 300 insertions(+), 6 deletions(-) diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index 6071f89c1..c3ca65082 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -71,7 +71,11 @@ export default class PooledConnectionProvider extends ConnectionProvider { */ _createConnection (address, release) { return this._createChannelConnection(address).then(connection => { - connection._release = () => release(address, connection) + connection._firstUsage = true + connection._release = () => { + connection._firstUsage = false + return release(address, connection) + } this._openConnections[connection.id] = connection return connection .connect(this._userAgent, this._authToken) @@ -119,7 +123,9 @@ export default class PooledConnectionProvider extends ConnectionProvider { const connection = await this._connectionPool.acquire(address) const serverInfo = new ServerInfo(connection.server, connection.protocol().version) try { - await connection.resetAndFlush() + if (!connection._firstUsage) { + await connection.resetAndFlush() + } } finally { await connection._release() } diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index d1325b5e1..63c908558 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -123,6 +123,7 @@ export default class ChannelConnection extends Connection { ) { super(errorHandler) + this._reseting = false this._id = idGenerator++ this._address = address this._server = { address: address.asHostPort() } @@ -304,7 +305,7 @@ export default class ChannelConnection extends Connection { */ resetAndFlush () { return new Promise((resolve, reject) => { - this._protocol.reset({ + this._reset({ onError: error => { if (this._isBroken) { // handling a fatal error, no need to raise a protocol violation @@ -328,7 +329,7 @@ export default class ChannelConnection extends Connection { return } - this._protocol.reset({ + this._reset({ onError: () => { this._protocol.resetFailure() }, @@ -338,6 +339,23 @@ export default class ChannelConnection extends Connection { }) } + _reset(observer) { + if (this._reseting) { + observer.onComplete() + return + } + this._reseting = true + this._protocol.reset({ + onError: error => { + this._reseting = false + observer.onError(error) + }, onComplete: () => { + this._reseting = false + observer.onComplete() + } + }) + } + /* * Pop next pending observer form the list of observers and make it current observer. * @protected diff --git a/packages/bolt-connection/src/connection/connection-delegate.js b/packages/bolt-connection/src/connection/connection-delegate.js index 14c82a4c5..16d2da48a 100644 --- a/packages/bolt-connection/src/connection/connection-delegate.js +++ b/packages/bolt-connection/src/connection/connection-delegate.js @@ -83,6 +83,10 @@ export default class DelegateConnection extends Connection { return this._delegate.resetAndFlush() } + hasOngoingObservableRequests () { + return this._delegate.hasOngoingObservableRequests() + } + close () { return this._delegate.close() } diff --git a/packages/bolt-connection/src/connection/connection.js b/packages/bolt-connection/src/connection/connection.js index 21cdd3c1a..d3c692712 100644 --- a/packages/bolt-connection/src/connection/connection.js +++ b/packages/bolt-connection/src/connection/connection.js @@ -103,6 +103,10 @@ export default class Connection { throw new Error('not implemented') } + hasOngoingObservableRequests () { + throw new Error('not implemented') + } + /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the connection is closed. diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index e1b297229..db23c1383 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -2584,6 +2584,26 @@ describe('#unit RoutingConnectionProvider', () => { .toBeGreaterThan(connections[0].resetAndFlush.mock.invocationCallOrder[0]) }) + it('should not call resetAndFlush for newly created connections', async () => { + const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup({ newConnection: true }) + const acquireSpy = jest.spyOn(pool, 'acquire') + + await connectionProvider.verifyConnectivityAndGetServerInfo({ database, accessMode }) + + const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers + const address = targetServers[0] + expect(acquireSpy).toHaveBeenCalledWith(address) + + const connections = seenConnectionsPerAddress.get(address) + + // verifying resetAndFlush was not called + expect(connections[0].resetAndFlush).not.toHaveBeenCalled() + + // extra checks + expect(connections.length).toBe(1) + expect(connections[0]._release).toHaveBeenCalled() + }) + it('should not acquire, resetAndFlush and release connections for sever with the other access mode', async () => { const { connectionProvider, routingTable, seenConnectionsPerAddress, pool } = setup() const acquireSpy = jest.spyOn(pool, 'acquire') @@ -2756,7 +2776,7 @@ describe('#unit RoutingConnectionProvider', () => { }) }) - function setup ({ resetAndFlush, releaseMock } = {}) { + function setup ({ resetAndFlush, releaseMock, newConnection } = { }) { const routingTable = newRoutingTable( database || null, [server1, server2], @@ -2774,6 +2794,7 @@ describe('#unit RoutingConnectionProvider', () => { seenConnectionsPerAddress.set(address, []) } const connection = new FakeConnection(address, release, 'version', protocolVersion, server) + connection._firstUsage = !!newConnection if (resetAndFlush) { connection.resetAndFlush = resetAndFlush } diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index c1acd5ce8..e6db24705 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -19,6 +19,7 @@ import ChannelConnection from '../../src/connection/connection-channel' import { int, internal, newError } from 'neo4j-driver-core' +import { observer } from 'neo4j-driver-core/types/internal' const { serverAddress: { ServerAddress }, @@ -260,6 +261,75 @@ describe('ChannelConnection', () => { expect(protocol.reset).toHaveBeenCalled() expect(protocol.resetFailure).toHaveBeenCalled() }) + + it('should not call protocol.reset() when there is an ongoing reset', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).not.toHaveBeenCalled() + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).toHaveBeenCalledTimes(1) + }) + + it('should call protocol.reset() when after a previous reset completed', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onComplete()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).toHaveBeenCalledTimes(1) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(2) + expect(protocol.resetFailure).toHaveBeenCalledTimes(2) + }) + + it('should call protocol.reset() when after a previous reset fail', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onError(new Error('some error'))), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).toHaveBeenCalledTimes(1) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalledTimes(2) + expect(protocol.resetFailure).toHaveBeenCalledTimes(2) + }) }) describe('when connection is not open', () => { @@ -340,6 +410,106 @@ describe('ChannelConnection', () => { }) }) + describe('.resetAndFlush()', () => { + it('should call protocol.reset() onComplete', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onComplete()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + await connection.resetAndFlush().catch(() => {}) + + expect(protocol.reset).toHaveBeenCalled() + }) + + it('should call protocol.reset() onError', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onError()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + await connection.resetAndFlush().catch(() => {}) + + expect(protocol.reset).toHaveBeenCalled() + }) + + it('should not call protocol.reset() when there is an ongoing reset', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + // to not block since the reset will never complete + connection.resetAndFlush() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + + await connection.resetAndFlush() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + }) + + it('should call protocol.reset() when after a previous reset completed', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onComplete()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + await connection.resetAndFlush() + + expect(protocol.reset).toHaveBeenCalledTimes(1) + + await connection.resetAndFlush() + + expect(protocol.reset).toHaveBeenCalledTimes(2) + }) + + it('should call protocol.reset() when after a previous reset fail', async () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onError(new Error('some error'))), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + await connection.resetAndFlush().catch(() => {}) + + expect(protocol.reset).toHaveBeenCalledTimes(1) + + await connection.resetAndFlush().catch(() => {}) + + expect(protocol.reset).toHaveBeenCalledTimes(2) + }) + }) + function spyOnConnectionChannel ({ channel, errorHandler, diff --git a/packages/core/src/connection.ts b/packages/core/src/connection.ts index 54e81276e..6761adc58 100644 --- a/packages/core/src/connection.ts +++ b/packages/core/src/connection.ts @@ -94,6 +94,14 @@ class Connection { throw Error('Not implemented') } + /** + * Checks if there is an ongoing request being handled + * @return {boolean} `true` if there is an ongoing request being handled + */ + hasOngoingObservableRequests (): boolean { + throw Error('Not implemented') + } + /** * Call close on the channel. * @returns {Promise} - A promise that will be resolved when the connection is closed. diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 57217cf58..8800d415a 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -197,7 +197,7 @@ class ConnectionHolder implements ConnectionHolderInterface { this._connectionPromise = this._connectionPromise .then((connection?: Connection|null) => { if (connection != null) { - if (connection.isOpen()) { + if (connection.isOpen() && connection.hasOngoingObservableRequests()) { return connection .resetAndFlush() .catch(ignoreError) diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index 19cbc9cd3..1abcd3ed9 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -186,6 +186,27 @@ describe('session', () => { }).catch(done) }, 70000) + it('close should reset connection if there is an ongoing request ', async () => { + const connection = newFakeConnection() + const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush') + const session = newSessionWithConnection(connection) + + await session.close() + + expect(resetAndFlushSpy).toHaveBeenCalledTimes(1) + }, 70000) + + it('close should not reset connection if there is not an ongoing request ', async () => { + const connection = newFakeConnection() + connection.hasOngoingObservableRequests = () => false + const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush') + const session = newSessionWithConnection(connection) + + await session.close() + + expect(resetAndFlushSpy).not.toHaveBeenCalled() + }, 70000) + it('should close transaction executor', done => { const session = newSessionWithConnection(newFakeConnection()) diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts index fce1c493c..f1157cede 100644 --- a/packages/core/test/utils/connection.fake.ts +++ b/packages/core/test/utils/connection.fake.ts @@ -179,6 +179,10 @@ export default class FakeConnection extends Connection { this._open = false return this } + + hasOngoingObservableRequests(): boolean { + return true + } } function mockResultStreamObserverWithError (query: string, parameters: any | undefined, error: Error): ResultStreamObserver { diff --git a/packages/neo4j-driver/test/internal/connection-holder.test.js b/packages/neo4j-driver/test/internal/connection-holder.test.js index db82ee64c..8adc4f0df 100644 --- a/packages/neo4j-driver/test/internal/connection-holder.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder.test.js @@ -319,6 +319,33 @@ describe('#unit ConnectionHolder', () => { }) }) + describe('and has not ongoing requests', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection().withHasOngoingObservableRequests( + false + ) + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(0) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + describe('and connection is not open', () => { let connection diff --git a/packages/neo4j-driver/test/internal/fake-connection.js b/packages/neo4j-driver/test/internal/fake-connection.js index 07f90910f..fff4d6d4c 100644 --- a/packages/neo4j-driver/test/internal/fake-connection.js +++ b/packages/neo4j-driver/test/internal/fake-connection.js @@ -52,6 +52,7 @@ export default class FakeConnection extends Connection { this.protocolErrorsHandled = 0 this.seenProtocolErrors = [] this.seenRequestRoutingInformation = [] + this._hasOnGoingObservableRequests = true } get id () { @@ -161,6 +162,15 @@ export default class FakeConnection extends Connection { return this } + withHasOngoingObservableRequests (value) { + this._hasOnGoingObservableRequests = value + return this + } + + hasOngoingObservableRequests () { + return this._hasOnGoingObservableRequests + } + closed () { this._open = false return this diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index 45e005157..2dc79ad92 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -33,6 +33,7 @@ const features = [ 'Feature:API:Driver.VerifyConnectivity', 'Feature:API:Result.Peek', 'Optimization:ImplicitDefaultArguments', + 'Optimization:MinimalResets', ...SUPPORTED_TLS ] From de8234ee35a58adf1d2e2bce8e5420229fb853c8 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 17 Mar 2022 12:32:42 +0100 Subject: [PATCH 02/12] Wait for the reset command finishes before notify onComplete --- .../src/connection/connection-channel.js | 17 ++++++++++++----- .../test/connection/connection-channel.test.js | 11 +++++++---- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index 63c908558..1c7262e82 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -124,6 +124,7 @@ export default class ChannelConnection extends Connection { super(errorHandler) this._reseting = false + this._resetObservers = [] this._id = idGenerator++ this._address = address this._server = { address: address.asHostPort() } @@ -340,18 +341,24 @@ export default class ChannelConnection extends Connection { } _reset(observer) { + this._resetObservers.push(observer) if (this._reseting) { - observer.onComplete() return } this._reseting = true + + const notifyFinish = (notify) => { + this._reseting = false + const observers = this._resetObservers + this._resetObservers = [] + observers.forEach(notify) + } + this._protocol.reset({ onError: error => { - this._reseting = false - observer.onError(error) + notifyFinish(obs => obs.onError(error)) }, onComplete: () => { - this._reseting = false - observer.onComplete() + notifyFinish(obs => obs.onComplete()) } }) } diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index e6db24705..2eed0c8fb 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -282,7 +282,7 @@ describe('ChannelConnection', () => { connection._resetOnFailure() expect(protocol.reset).toHaveBeenCalledTimes(1) - expect(protocol.resetFailure).toHaveBeenCalledTimes(1) + expect(protocol.resetFailure).not.toHaveBeenCalled() }) it('should call protocol.reset() when after a previous reset completed', () => { @@ -451,20 +451,23 @@ describe('ChannelConnection', () => { } const protocol = { - reset: jest.fn(), + reset: jest.fn(observer => { + setTimeout(() => observer.onComplete(), 100) + }), resetFailure: jest.fn() } const protocolSupplier = () => protocol const connection = spyOnConnectionChannel({ channel, protocolSupplier }) - // to not block since the reset will never complete - connection.resetAndFlush() + const completeFirstResetAndFlush = connection.resetAndFlush() expect(protocol.reset).toHaveBeenCalledTimes(1) await connection.resetAndFlush() expect(protocol.reset).toHaveBeenCalledTimes(1) + + await completeFirstResetAndFlush }) it('should call protocol.reset() when after a previous reset completed', async () => { From 4f25a1a814c1338e8d118c2fb6b3ae6d9d087407 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 17 Mar 2022 17:34:10 +0100 Subject: [PATCH 03/12] Checking if last message was login before send reset on `verifyConnectivity` --- packages/bolt-connection/src/bolt/bolt-protocol-v1.js | 7 +++++++ .../src/connection-provider/connection-provider-pooled.js | 2 +- .../connection-provider/connection-provider-direct.test.js | 2 +- .../connection-provider-routing.test.js | 3 ++- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js index a2648443e..312a717d8 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js @@ -79,6 +79,7 @@ export default class BoltProtocol { this._log = log this._onProtocolError = onProtocolError this._fatalError = null + this._lastMessageSignature = null } /** @@ -356,6 +357,8 @@ export default class BoltProtocol { this._log.debug(`C: ${message}`) } + this._lastMessageSignature = message.signature + this.packer().packStruct( message.signature, message.fields.map(field => this.packer().packable(field)) @@ -369,6 +372,10 @@ export default class BoltProtocol { } } + isLastMessageLogin () { + return this._lastMessageSignature === 0x01 + } + /** * Notifies faltal erros to the observers and mark the protocol in the fatal error state. * @param {Error} error The error diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index c3ca65082..7995aca69 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -123,7 +123,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { const connection = await this._connectionPool.acquire(address) const serverInfo = new ServerInfo(connection.server, connection.protocol().version) try { - if (!connection._firstUsage) { + if (!connection._firstUsage && !connection.protocol().isLastMessageLogin()) { await connection.resetAndFlush() } } finally { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index 6abc03082..e1ec76394 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -251,7 +251,7 @@ describe('.verifyConnectivityAndGetServerInfo()', () => { const create = (address, release) => { const connection = new FakeConnection(address, release, server) connection.protocol = () => { - return { version: protocolVersion } + return { version: protocolVersion, isLastMessageLogin() { return false } } } connection.resetAndFlush = resetAndFlush if (releaseMock) { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index db23c1383..e765f342f 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -3103,7 +3103,8 @@ class FakeConnection extends Connection { protocol () { return { - version: this._protocolVersion + version: this._protocolVersion, + isLastMessageLogin: () => this._firstUsage } } } From 7b8b01175882a448b07e0622cb056cf95dd13e0d Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 17 Mar 2022 18:18:47 +0100 Subject: [PATCH 04/12] Change check if message was never used before --- .../src/connection-provider/connection-provider-pooled.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index 7995aca69..42cb30fbc 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -71,9 +71,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { */ _createConnection (address, release) { return this._createChannelConnection(address).then(connection => { - connection._firstUsage = true connection._release = () => { - connection._firstUsage = false return release(address, connection) } this._openConnections[connection.id] = connection @@ -123,7 +121,7 @@ export default class PooledConnectionProvider extends ConnectionProvider { const connection = await this._connectionPool.acquire(address) const serverInfo = new ServerInfo(connection.server, connection.protocol().version) try { - if (!connection._firstUsage && !connection.protocol().isLastMessageLogin()) { + if (!connection.protocol().isLastMessageLogin()) { await connection.resetAndFlush() } } finally { From 6326bb73d5f2e473e805fe594d7e1b13de6f3f62 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 18 Mar 2022 13:53:05 +0100 Subject: [PATCH 05/12] Fix sending reset --- .../bolt-connection/src/bolt/bolt-protocol-v1.js | 4 ++++ .../src/connection/connection-channel.js | 14 +++++++++++++- .../test/connection/connection-channel.test.js | 6 ++++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js index 312a717d8..55eec22e8 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js @@ -376,6 +376,10 @@ export default class BoltProtocol { return this._lastMessageSignature === 0x01 } + isLastMessageReset () { + return this._lastMessageSignature === 0x0f + } + /** * Notifies faltal erros to the observers and mark the protocol in the fatal error state. * @param {Error} error The error diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index 1c7262e82..4efcb5f51 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -341,10 +341,22 @@ export default class ChannelConnection extends Connection { } _reset(observer) { - this._resetObservers.push(observer) if (this._reseting) { + if (!this._protocol.isLastMessageReset()) { + this._protocol.reset({ + onError: error => { + observer.onError(error) + }, onComplete: () => { + observer.onComplete() + } + }) + } else { + this._resetObservers.push(observer) + } return } + + this._resetObservers.push(observer) this._reseting = true const notifyFinish = (notify) => { diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index 2eed0c8fb..608c5d842 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -269,7 +269,8 @@ describe('ChannelConnection', () => { const protocol = { reset: jest.fn(), - resetFailure: jest.fn() + resetFailure: jest.fn(), + isLastMessageReset: jest.fn(() => true) } const protocolSupplier = () => protocol const connection = spyOnConnectionChannel({ channel, protocolSupplier }) @@ -454,7 +455,8 @@ describe('ChannelConnection', () => { reset: jest.fn(observer => { setTimeout(() => observer.onComplete(), 100) }), - resetFailure: jest.fn() + resetFailure: jest.fn(), + isLastMessageReset: jest.fn(() => true) } const protocolSupplier = () => protocol const connection = spyOnConnectionChannel({ channel, protocolSupplier }) From 855122985893b415a9d354b9246faabfe1d32d12 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 18 Mar 2022 17:36:29 +0100 Subject: [PATCH 06/12] Sending reset if there is a tx going on --- packages/core/src/internal/connection-holder.ts | 8 ++++++-- packages/core/src/session.ts | 4 ++-- packages/core/test/session.test.ts | 15 +++++++++++++-- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 8800d415a..2ae09b76e 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -178,12 +178,16 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._connectionPromise } +<<<<<<< HEAD close (): Promise { +======= + close(hasTx?: boolean): Promise { +>>>>>>> Sending reset if there is a tx going on if (this._referenceCount === 0) { return this._connectionPromise } this._referenceCount = 0 - return this._releaseConnection() + return this._releaseConnection(hasTx) } /** @@ -197,7 +201,7 @@ class ConnectionHolder implements ConnectionHolderInterface { this._connectionPromise = this._connectionPromise .then((connection?: Connection|null) => { if (connection != null) { - if (connection.isOpen() && connection.hasOngoingObservableRequests()) { + if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx)) { return connection .resetAndFlush() .catch(ignoreError) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index e0e0bee59..f785c120a 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -494,8 +494,8 @@ class Session { this._transactionExecutor.close() - await this._readConnectionHolder.close() - await this._writeConnectionHolder.close() + await this._readConnectionHolder.close(this._hasTx) + await this._writeConnectionHolder.close(this._hasTx) } } diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index 1abcd3ed9..6a561be52 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -196,17 +196,28 @@ describe('session', () => { expect(resetAndFlushSpy).toHaveBeenCalledTimes(1) }, 70000) - it('close should not reset connection if there is not an ongoing request ', async () => { + it('close should not reset connection if there is not an ongoing request', async () => { const connection = newFakeConnection() connection.hasOngoingObservableRequests = () => false const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush') - const session = newSessionWithConnection(connection) + const session = newSessionWithConnection(connection, false) await session.close() expect(resetAndFlushSpy).not.toHaveBeenCalled() }, 70000) + it('close should reset connection if there is not an ongoing request but it has tx running', async () => { + const connection = newFakeConnection() + connection.hasOngoingObservableRequests = () => false + const resetAndFlushSpy = jest.spyOn(connection, 'resetAndFlush') + const session = newSessionWithConnection(connection) + + await session.close() + + expect(resetAndFlushSpy).toHaveBeenCalled() + }, 70000) + it('should close transaction executor', done => { const session = newSessionWithConnection(newFakeConnection()) From 83b165de714cce6f2a4f7947de267217dabb8959 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 12 Apr 2022 14:13:38 +0200 Subject: [PATCH 07/12] skip a test just for now --- packages/testkit-backend/src/skipped-tests/common.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index a6f61fc00..f67bceb8f 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -1,6 +1,10 @@ import skip, { ifEquals, ifEndsWith } from './skip' const skippedTests = [ + skip( + 'Just for now', + ifEndsWith('test_no_reset_on_clean_connection') + ), skip( 'Skipped because server doesn\'t support protocol 5.0 yet', ifEndsWith('neo4j.test_summary.TestSummary.test_protocol_version_information') From 06ad4da8bff9b7f2915ebe0e0a29925170010ce1 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 22 Apr 2022 14:32:57 +0200 Subject: [PATCH 08/12] Fix merge issues --- packages/core/src/internal/connection-holder.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 2ae09b76e..6b608fedf 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -178,11 +178,7 @@ class ConnectionHolder implements ConnectionHolderInterface { return this._connectionPromise } -<<<<<<< HEAD - close (): Promise { -======= - close(hasTx?: boolean): Promise { ->>>>>>> Sending reset if there is a tx going on + close (hasTx?: boolean): Promise { if (this._referenceCount === 0) { return this._connectionPromise } @@ -197,11 +193,11 @@ class ConnectionHolder implements ConnectionHolderInterface { * @return {Promise} - promise resolved then connection is returned to the pool. * @private */ - private _releaseConnection (): Promise { + private _releaseConnection (hasTx?: boolean): Promise { this._connectionPromise = this._connectionPromise .then((connection?: Connection|null) => { if (connection != null) { - if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx)) { + if (connection.isOpen() && (connection.hasOngoingObservableRequests() || hasTx === true)) { return connection .resetAndFlush() .catch(ignoreError) From 8de05ab301308e0029ab3fc1386c0a9c39211fd4 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 22 Apr 2022 15:28:28 +0200 Subject: [PATCH 09/12] Fix lint issue --- .../bolt-connection/test/connection/connection-channel.test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index 608c5d842..21696bd27 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -19,7 +19,6 @@ import ChannelConnection from '../../src/connection/connection-channel' import { int, internal, newError } from 'neo4j-driver-core' -import { observer } from 'neo4j-driver-core/types/internal' const { serverAddress: { ServerAddress }, From 8f8beca16a5fbb3970acc2ba77c7ced7814c7d24 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 5 May 2022 15:58:41 +0200 Subject: [PATCH 10/12] Re-enable test --- packages/testkit-backend/src/skipped-tests/common.js | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index f67bceb8f..a6f61fc00 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -1,10 +1,6 @@ import skip, { ifEquals, ifEndsWith } from './skip' const skippedTests = [ - skip( - 'Just for now', - ifEndsWith('test_no_reset_on_clean_connection') - ), skip( 'Skipped because server doesn\'t support protocol 5.0 yet', ifEndsWith('neo4j.test_summary.TestSummary.test_protocol_version_information') From 828742cce47f531d7de859e25535c072f8c4d075 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 6 May 2022 09:57:37 +0200 Subject: [PATCH 11/12] Add debug --- packages/testkit-backend/src/request-handlers.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 5903a9893..a22750095 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -335,7 +335,7 @@ export function SessionWriteTransaction (context, data, wire) { } export function StartTest (context, { testName }, wire) { - if (testName.endsWith('.test_disconnect_session_on_tx_pull_after_record')) { + if (testName.endsWith('.test_disconnect_session_on_tx_pull_after_record') || testName.endsWith('test_should_reject_server_using_verify_connectivity_bolt_4x4')) { context.logLevel = 'debug' } else { context.logLevel = null From 6398a1acc69529a0352971408af9e00df23be9c6 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 6 May 2022 11:12:41 +0200 Subject: [PATCH 12/12] Debug --- packages/testkit-backend/src/request-handlers.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index a22750095..aecdfdace 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -335,7 +335,7 @@ export function SessionWriteTransaction (context, data, wire) { } export function StartTest (context, { testName }, wire) { - if (testName.endsWith('.test_disconnect_session_on_tx_pull_after_record') || testName.endsWith('test_should_reject_server_using_verify_connectivity_bolt_4x4')) { + if (testName.endsWith('.test_disconnect_session_on_tx_pull_after_record') || testName.endsWith('test_no_reset_on_clean_connection')) { context.logLevel = 'debug' } else { context.logLevel = null