From 717fe66ea7955f873ff1fac8fbc84b3bdf158130 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Thu, 4 Nov 2021 14:03:45 -0300 Subject: [PATCH] Don't send RESET message when the connection is not open (#807) Send RESET when the connection is not optimal since it was time waiting for the message send times out. It could be really fast in the nodejs environment, but it could take some minutes in browser. Skip send RESET when the connection is not open avoid this issue. --- .../src/connection/connection-channel.js | 9 +- packages/bolt-connection/src/pool/pool.js | 17 ++- .../channel/browser/browser-channel.test.js | 32 +++++ .../test/channel/node/node-channel.test.js | 14 +++ .../connection/connection-channel.test.js | 63 +++++++++- .../core/src/internal/connection-holder.ts | 11 +- .../test/internal/connection-holder.test.js | 110 ++++++++++++++++++ packages/neo4j-driver/test/result.test.js | 2 +- 8 files changed, 249 insertions(+), 9 deletions(-) diff --git a/packages/bolt-connection/src/connection/connection-channel.js b/packages/bolt-connection/src/connection/connection-channel.js index c30609705..e1559b1c8 100644 --- a/packages/bolt-connection/src/connection/connection-channel.js +++ b/packages/bolt-connection/src/connection/connection-channel.js @@ -270,7 +270,10 @@ export default class ChannelConnection extends Connection { */ _handleFatalError (error) { this._isBroken = true - this._error = this.handleAndTransformError(this._protocol.currentFailure || error, this._address) + this._error = this.handleAndTransformError( + this._protocol.currentFailure || error, + this._address + ) if (this._log.isErrorEnabled()) { this._log.error( @@ -320,6 +323,10 @@ export default class ChannelConnection extends Connection { } _resetOnFailure () { + if (!this.isOpen()) { + return + } + this._protocol.reset({ onError: () => { this._protocol.resetFailure() diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index b48f2cf47..dff3cd47a 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -134,9 +134,22 @@ class Pool { * Destroy all idle resources in this pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - close () { + async close () { this._closed = true - return Promise.all(Object.keys(this._pools).map(key => this._purgeKey(key))) + /** + * The lack of Promise consuming was making the driver do not close properly in the scenario + * captured at result.test.js:it('should handle missing onCompleted'). The test was timing out + * because while wainting for the driver close. + * + * Consuming the Promise.all or by calling then or by awaiting in the result inside this method solved + * the issue somehow. + * + * PS: the return of this method was already awaited at PooledConnectionProvider.close, but the await bellow + * seems to be need also. + */ + return await Promise.all( + Object.keys(this._pools).map(key => this._purgeKey(key)) + ) } /** diff --git a/packages/bolt-connection/test/channel/browser/browser-channel.test.js b/packages/bolt-connection/test/channel/browser/browser-channel.test.js index 53fa9f873..d9e0aa7a8 100644 --- a/packages/bolt-connection/test/channel/browser/browser-channel.test.js +++ b/packages/bolt-connection/test/channel/browser/browser-channel.test.js @@ -294,6 +294,38 @@ describe('WebSocketChannel', () => { } }) + describe('.close()', () => { + it('should set _open to false before resolve the promise', async () => { + const fakeSetTimeout = setTimeoutMock.install() + try { + // do not execute setTimeout callbacks + fakeSetTimeout.pause() + const address = ServerAddress.fromUrl('bolt://localhost:8989') + const driverConfig = { connectionTimeout: 4242 } + const channelConfig = new ChannelConfig( + address, + driverConfig, + SERVICE_UNAVAILABLE + ) + webSocketChannel = new WebSocketChannel( + channelConfig, + undefined, + createWebSocketFactory(WS_OPEN) + ) + + expect(webSocketChannel._open).toBe(true) + + const promise = webSocketChannel.close() + + expect(webSocketChannel._open).toBe(false) + + await promise + } finally { + fakeSetTimeout.uninstall() + } + }) + }) + describe('.setupReceiveTimeout()', () => { beforeEach(() => { const address = ServerAddress.fromUrl('http://localhost:8989') diff --git a/packages/bolt-connection/test/channel/node/node-channel.test.js b/packages/bolt-connection/test/channel/node/node-channel.test.js index bd649e854..5d6b175b1 100644 --- a/packages/bolt-connection/test/channel/node/node-channel.test.js +++ b/packages/bolt-connection/test/channel/node/node-channel.test.js @@ -44,6 +44,20 @@ describe('NodeChannel', () => { return expect(channel.close()).resolves.not.toThrow() }) + describe('.close()', () => { + it('should set _open to false before resolve the promise', async () => { + const channel = createMockedChannel(true) + + expect(channel._open).toBe(true) + + const promise = channel.close() + + expect(channel._open).toBe(false) + + await promise + }) + }) + describe('.setupReceiveTimeout()', () => { it('should call socket.setTimeout(receiveTimeout)', () => { const receiveTimeout = 42 diff --git a/packages/bolt-connection/test/connection/connection-channel.test.js b/packages/bolt-connection/test/connection/connection-channel.test.js index da4ffa424..9b01a19ed 100644 --- a/packages/bolt-connection/test/connection/connection-channel.test.js +++ b/packages/bolt-connection/test/connection/connection-channel.test.js @@ -19,7 +19,6 @@ import ChannelConnection from '../../src/connection/connection-channel' import { int, internal, newError } from 'neo4j-driver-core' -import { add } from 'lodash' const { serverAddress: { ServerAddress }, @@ -198,7 +197,69 @@ describe('ChannelConnection', () => { expect(notifyFatalError).toHaveBeenCalledWith(currentFailure) }) }) + }) + + describe('._resetOnFailure()', () => { + describe('when connection isOpen', () => { + it('should call protocol.reset() and then protocol.resetFailure() onComplete', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onComplete()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalled() + expect(protocol.resetFailure).toHaveBeenCalled() + }) + + it('should call protocol.reset() and then protocol.resetFailure() onError', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onError()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalled() + expect(protocol.resetFailure).toHaveBeenCalled() + }) + }) + + describe('when connection is not open', () => { + it('should not call protocol.reset() and protocol.resetFailure()', () => { + const channel = { + _open: false + } + + const protocol = { + reset: jest.fn(observer => { + observer.onComplete() + observer.onError() + }), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).not.toHaveBeenCalled() + expect(protocol.resetFailure).not.toHaveBeenCalled() + }) + }) }) function spyOnConnectionChannel ({ diff --git a/packages/core/src/internal/connection-holder.ts b/packages/core/src/internal/connection-holder.ts index 466805409..394f10491 100644 --- a/packages/core/src/internal/connection-holder.ts +++ b/packages/core/src/internal/connection-holder.ts @@ -196,10 +196,13 @@ class ConnectionHolder implements ConnectionHolderInterface { this._connectionPromise = this._connectionPromise .then((connection?: Connection) => { if (connection) { - return connection - .resetAndFlush() - .catch(ignoreError) - .then(() => connection._release()) + if (connection.isOpen()) { + return connection + .resetAndFlush() + .catch(ignoreError) + .then(() => connection._release()) + } + return connection._release() } else { return Promise.resolve() } diff --git a/packages/neo4j-driver/test/internal/connection-holder.test.js b/packages/neo4j-driver/test/internal/connection-holder.test.js index 9c6dad2ef..f9ef9484e 100644 --- a/packages/neo4j-driver/test/internal/connection-holder.test.js +++ b/packages/neo4j-driver/test/internal/connection-holder.test.js @@ -292,6 +292,116 @@ describe('#unit ConnectionHolder', () => { expect(connectionHolder.database()).toBe('testdb') }) + + describe('.releaseConnection()', () => { + describe('when the connection is initialized', () => { + describe('and connection is open', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(1) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + + describe('and connection is not open', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection() + connection._open = false + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should not call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(0) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + }) + }) + + describe('.close()', () => { + describe('when the connection is initialized', () => { + describe('and connection is open', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.close() + }) + + it('should call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(1) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + + describe('and connection is not open', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection() + connection._open = false + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.close() + }) + + it('should not call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(0) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + }) + }) }) class RecordingConnectionProvider extends SingleConnectionProvider { diff --git a/packages/neo4j-driver/test/result.test.js b/packages/neo4j-driver/test/result.test.js index 76ab8bf0a..035c3c059 100644 --- a/packages/neo4j-driver/test/result.test.js +++ b/packages/neo4j-driver/test/result.test.js @@ -73,7 +73,7 @@ describe('#integration result stream', () => { done() }, onError: error => { - console.log(error) + done.fail(error) } }) })