From bb7e66df3d9f994e66cca047a0844bb1d8f27f4d Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 23 May 2018 15:52:19 +0200 Subject: [PATCH 1/2] Simplify RESET handling Removed RESET that does not mute ACK_FAILURE handling. --- src/v1/internal/connection-holder.js | 15 ++++----------- src/v1/internal/connector.js | 12 ++---------- test/internal/connection-holder.test.js | 6 +++--- test/internal/fake-connection.js | 19 +------------------ test/v1/session.test.js | 6 +++--- 5 files changed, 13 insertions(+), 45 deletions(-) diff --git a/src/v1/internal/connection-holder.js b/src/v1/internal/connection-holder.js index cc1212c5d..2778c036d 100644 --- a/src/v1/internal/connection-holder.js +++ b/src/v1/internal/connection-holder.js @@ -70,8 +70,7 @@ export default class ConnectionHolder { this._referenceCount--; if (this._referenceCount === 0) { - // release a connection without muting ACK_FAILURE, this is the last action on this connection - return this._releaseConnection(true); + return this._releaseConnection(); } return this._connectionPromise; } @@ -85,9 +84,7 @@ export default class ConnectionHolder { return this._connectionPromise; } this._referenceCount = 0; - // release a connection and mute ACK_FAILURE, this might be called concurrently with other - // operations and thus should ignore failure handling - return this._releaseConnection(false); + return this._releaseConnection(); } /** @@ -97,14 +94,10 @@ export default class ConnectionHolder { * @return {Promise} - promise resolved then connection is returned to the pool. * @private */ - _releaseConnection(sync) { + _releaseConnection() { this._connectionPromise = this._connectionPromise.then(connection => { if (connection) { - if(sync) { - connection.reset(); - } else { - connection.resetAsync(); - } + connection.reset(); connection.sync(); connection._release(); } diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 1ffcf9a1f..e53e6cf62 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -315,8 +315,8 @@ class Connection { } /** Queue a RESET-message to be sent to the database. Mutes failure handling. */ - resetAsync( observer ) { - log("C", "RESET_ASYNC"); + reset(observer) { + log('C', 'RESET'); this._isHandlingFailure = true; let self = this; let wrappedObs = { @@ -330,14 +330,6 @@ class Connection { } }; this._queueObserver(wrappedObs); - this._packer.packStruct( RESET, [], (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); - } - - /** Queue a RESET-message to be sent to the database */ - reset(observer) { - log('C', 'RESET'); - this._queueObserver(observer); this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err)); this._chunker.messageBoundary(); } diff --git a/test/internal/connection-holder.test.js b/test/internal/connection-holder.test.js index e57306cc5..8a269f62e 100644 --- a/test/internal/connection-holder.test.js +++ b/test/internal/connection-holder.test.js @@ -168,7 +168,7 @@ describe('ConnectionHolder', () => { connectionHolder.initializeConnection(); connectionHolder.close().then(() => { - expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection.isReleasedOnce()).toBeTruthy(); done(); }); }); @@ -201,11 +201,11 @@ describe('ConnectionHolder', () => { connectionHolder.initializeConnection(); connectionHolder.close().then(() => { - expect(connection1.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection1.isReleasedOnce()).toBeTruthy(); connectionHolder.initializeConnection(); connectionHolder.close().then(() => { - expect(connection2.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection2.isReleasedOnce()).toBeTruthy(); done(); }); }); diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 284a24ea8..95c85ffbb 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -31,7 +31,6 @@ export default class FakeConnection { this.creationTimestamp = Date.now(); this.resetInvoked = 0; - this.resetAsyncInvoked = 0; this.syncInvoked = 0; this.releaseInvoked = 0; this.initializationInvoked = 0; @@ -54,10 +53,6 @@ export default class FakeConnection { this.resetInvoked++; } - resetAsync() { - this.resetAsyncInvoked++; - } - sync() { this.syncInvoked++; } @@ -75,17 +70,6 @@ export default class FakeConnection { return this._open; } - isReleasedOnceOnSessionClose() { - return this.isReleasedOnSessionCloseTimes(1); - } - - isReleasedOnSessionCloseTimes(times) { - return this.resetAsyncInvoked === times && - this.resetInvoked === 0 && - this.syncInvoked === times && - this.releaseInvoked === times; - } - isNeverReleased() { return this.isReleasedTimes(0); } @@ -95,8 +79,7 @@ export default class FakeConnection { } isReleasedTimes(times) { - return this.resetAsyncInvoked === 0 && - this.resetInvoked === times && + return this.resetInvoked === times && this.syncInvoked === times && this.releaseInvoked === times; } diff --git a/test/v1/session.test.js b/test/v1/session.test.js index 6a9150fba..3a88b086a 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -77,13 +77,13 @@ describe('session', () => { const session = newSessionWithConnection(connection); session.close(() => { - expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection.isReleasedOnce()).toBeTruthy(); session.close(() => { - expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection.isReleasedOnce()).toBeTruthy(); session.close(() => { - expect(connection.isReleasedOnceOnSessionClose()).toBeTruthy(); + expect(connection.isReleasedOnce()).toBeTruthy(); done(); }); }); From 6ae3709963ba27081c4988cd105f0853eaffc8f8 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 24 May 2018 19:57:08 +0200 Subject: [PATCH 2/2] Improve failure handling for RESET and ACK_FAILURE Both RESET and ACK_FAILURE messages will now be immediately flushed to the network to make sure connections are fully cleaned up before being returned to the pool. Outbound messages will only be sent if the connection is not broken. Failure to RESET or ACK_FAILURE will be considered a fatal error, unless ACK_FAILURE was ignored because of a subsequent RESET. --- src/v1/internal/connection-holder.js | 14 +- src/v1/internal/connector.js | 161 +++++++++++++-------- test/internal/connector.test.js | 116 ++++++++++++++- test/internal/fake-connection.js | 9 +- test/resources/boltstub/reset_error.script | 9 ++ test/v1/direct.driver.boltkit.test.js | 30 ++++ 6 files changed, 271 insertions(+), 68 deletions(-) create mode 100644 test/resources/boltstub/reset_error.script diff --git a/src/v1/internal/connection-holder.js b/src/v1/internal/connection-holder.js index 2778c036d..9ded035af 100644 --- a/src/v1/internal/connection-holder.js +++ b/src/v1/internal/connection-holder.js @@ -97,12 +97,13 @@ export default class ConnectionHolder { _releaseConnection() { this._connectionPromise = this._connectionPromise.then(connection => { if (connection) { - connection.reset(); - connection.sync(); - connection._release(); + return connection.resetAndFlush() + .catch(ignoreError) + .then(() => connection._release()); + } else { + return Promise.resolve(); } - }).catch(ignoredError => { - }); + }).catch(ignoreError); return this._connectionPromise; } @@ -127,6 +128,9 @@ class EmptyConnectionHolder extends ConnectionHolder { } } +function ignoreError(error) { +} + /** * Connection holder that does not manage any connections. * @type {ConnectionHolder} diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index e53e6cf62..e35f2d7e4 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -22,7 +22,7 @@ import NodeChannel from './ch-node'; import {Chunker, Dechunker} from './chunking'; import packStreamUtil from './packstream-util'; import {alloc} from './buf'; -import {newError} from './../error'; +import {newError, PROTOCOL_ERROR} from './../error'; import ChannelConfig from './ch-config'; import urlUtil from './url-util'; import StreamObserver from './stream-observer'; @@ -120,7 +120,7 @@ class Connection { this._packer = packStreamUtil.createLatestPacker(this._chunker); this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers); - this._isHandlingFailure = false; + this._ackFailureMuted = false; this._currentFailure = null; this._state = new ConnectionState(this); @@ -241,25 +241,8 @@ class Connection { this._currentObserver.onError( this._currentFailure ); } finally { this._updateCurrentObserver(); - // Things are now broken. Pending observers will get FAILURE messages routed until - // We are done handling this failure. - if( !this._isHandlingFailure ) { - this._isHandlingFailure = true; - - // isHandlingFailure was false, meaning this is the first failure message - // we see from this failure. We may see several others, one for each message - // we had "optimistically" already sent after whatever it was that failed. - // We only want to and need to ACK the first one, which is why we are tracking - // this _isHandlingFailure thing. - this._ackFailure({ - onNext: NO_OP, - onError: NO_OP, - onCompleted: () => { - this._isHandlingFailure = false; - this._currentFailure = null; - } - }); - } + // Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure. + this._ackFailureIfNeeded(); } break; case IGNORED: @@ -268,7 +251,7 @@ class Connection { if (this._currentFailure && this._currentObserver.onError) this._currentObserver.onError(this._currentFailure); else if(this._currentObserver.onError) - this._currentObserver.onError(payload); + this._currentObserver.onError(newError('Ignored either because of an error or RESET')); } finally { this._updateCurrentObserver(); } @@ -282,64 +265,114 @@ class Connection { initialize( clientName, token, observer ) { log("C", "INIT", clientName, token); const initObserver = this._state.wrap(observer); - this._queueObserver(initObserver); - this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)], - (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); - this.sync(); + const queued = this._queueObserver(initObserver); + if (queued) { + this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)], + (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + this.sync(); + } } /** Queue a RUN-message to be sent to the database */ run( statement, params, observer ) { log("C", "RUN", statement, params); - this._queueObserver(observer); - this._packer.packStruct( RUN, [this._packable(statement), this._packable(params)], - (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)], + (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } } /** Queue a PULL_ALL-message to be sent to the database */ pullAll( observer ) { log("C", "PULL_ALL"); - this._queueObserver(observer); - this._packer.packStruct( PULL_ALL, [], (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(PULL_ALL, [], (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } } /** Queue a DISCARD_ALL-message to be sent to the database */ discardAll( observer ) { log("C", "DISCARD_ALL"); - this._queueObserver(observer); - this._packer.packStruct( DISCARD_ALL, [], (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(DISCARD_ALL, [], (err) => this._handleFatalError(err)); + this._chunker.messageBoundary(); + } } - /** Queue a RESET-message to be sent to the database. Mutes failure handling. */ - reset(observer) { + /** + * Send a RESET-message to the database. Mutes failure handling. + * Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required. + * @return {Promise} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives. + */ + resetAndFlush() { log('C', 'RESET'); - this._isHandlingFailure = true; - let self = this; - let wrappedObs = { - onNext: observer ? observer.onNext : NO_OP, - onError: observer ? observer.onError : NO_OP, - onCompleted: () => { - self._isHandlingFailure = false; - if (observer) { - observer.onCompleted(); + this._ackFailureMuted = true; + + return new Promise((resolve, reject) => { + const observer = { + onNext: record => { + const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record)); + reject(neo4jError); + }, + onError: error => { + if (this._isBroken) { + // handling a fatal error, no need to raise a protocol violation + reject(error); + } else { + const neo4jError = this._handleProtocolError('Received FAILURE as a response for RESET: ' + error); + reject(neo4jError); + } + }, + onCompleted: () => { + this._ackFailureMuted = false; + resolve(); } + }; + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(RESET, [], err => this._handleFatalError(err)); + this._chunker.messageBoundary(); + this.sync(); } - }; - this._queueObserver(wrappedObs); - this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err)); - this._chunker.messageBoundary(); + }); } - /** Queue a ACK_FAILURE-message to be sent to the database */ - _ackFailure( observer ) { - log("C", "ACK_FAILURE"); - this._queueObserver(observer); - this._packer.packStruct( ACK_FAILURE, [], (err) => this._handleFatalError(err) ); - this._chunker.messageBoundary(); + _ackFailureIfNeeded() { + if (this._ackFailureMuted) { + return; + } + + log('C', 'ACK_FAILURE'); + + const observer = { + onNext: record => { + this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record)); + }, + onError: error => { + if (!this._isBroken && !this._ackFailureMuted) { + // not handling a fatal error and RESET did not cause the given error - looks like a protocol violation + this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error); + } else { + this._currentFailure = null; + } + }, + onCompleted: () => { + this._currentFailure = null; + } + }; + + const queued = this._queueObserver(observer); + if (queued) { + this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err)); + this._chunker.messageBoundary(); + this.sync(); + } } _queueObserver(observer) { @@ -347,7 +380,7 @@ class Connection { if( observer && observer.onError ) { observer.onError(this._error); } - return; + return false; } observer = observer || NO_OP_OBSERVER; observer.onCompleted = observer.onCompleted || NO_OP; @@ -358,6 +391,7 @@ class Connection { } else { this._pendingObservers.push( observer ); } + return true; } /** @@ -419,6 +453,15 @@ class Connection { } } } + + _handleProtocolError(message) { + this._ackFailureMuted = false; + this._currentFailure = null; + this._updateCurrentObserver(); + const error = newError(message, PROTOCOL_ERROR); + this._handleFatalError(error); + return error; + } } class ConnectionState { diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index 4c8880453..c738f981b 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -22,11 +22,16 @@ import {connect, Connection} from '../../src/v1/internal/connector'; import {Packer} from '../../src/v1/internal/packstream-v1'; import {Chunker} from '../../src/v1/internal/chunking'; import {alloc} from '../../src/v1/internal/buf'; -import {Neo4jError} from '../../src/v1/error'; +import {Neo4jError, newError} from '../../src/v1/error'; import sharedNeo4j from '../internal/shared-neo4j'; import {ServerVersion} from '../../src/v1/internal/server-version'; import lolex from 'lolex'; +const ILLEGAL_MESSAGE = {signature: 42, fields: []}; +const SUCCESS_MESSAGE = {signature: 0x70, fields: [{}]}; +const FAILURE_MESSAGE = {signature: 0x7F, fields: [newError('Hello')]}; +const RECORD_MESSAGE = {signature: 0x71, fields: [{value: 'Hello'}]}; + describe('connector', () => { let clock; @@ -241,6 +246,104 @@ describe('connector', () => { testConnectionTimeout(true, done); }); + it('should not queue INIT observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection.initialize('Hello', {}, {})); + }); + + it('should not queue RUN observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection.run('RETURN 1', {}, {})); + }); + + it('should not queue PULL_ALL observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection.pullAll({})); + }); + + it('should not queue DISCARD_ALL observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection.discardAll({})); + }); + + it('should not queue RESET observer when broken', () => { + const resetAction = connection => connection.resetAndFlush().catch(ignore => { + }); + + testQueueingOfObserversWithBrokenConnection(resetAction); + }); + + it('should not queue ACK_FAILURE observer when broken', () => { + testQueueingOfObserversWithBrokenConnection(connection => connection._ackFailureIfNeeded()); + }); + + it('should reset and flush when SUCCESS received', done => { + connection = connect('bolt://localhost'); + + connection.resetAndFlush().then(() => { + expect(connection.isOpen()).toBeTruthy(); + done(); + }).catch(error => done.fail(error)); + + connection._handleMessage(SUCCESS_MESSAGE); + }); + + it('should fail to reset and flush when FAILURE received', done => { + connection = connect('bolt://localhost'); + + connection.resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual('Received FAILURE as a response for RESET: Neo4jError: Hello'); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + done(); + }); + + connection._handleMessage(FAILURE_MESSAGE); + }); + + it('should fail to reset and flush when RECORD received', done => { + connection = connect('bolt://localhost'); + + connection.resetAndFlush() + .then(() => done.fail('Should fail')) + .catch(error => { + expect(error.message).toEqual('Received RECORD as a response for RESET: {"value":"Hello"}'); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + done(); + }); + + connection._handleMessage(RECORD_MESSAGE); + }); + + it('should ACK_FAILURE when SUCCESS received', () => { + connection = connect('bolt://localhost'); + + connection._currentFailure = newError('Hello'); + connection._ackFailureIfNeeded(); + + connection._handleMessage(SUCCESS_MESSAGE); + expect(connection._currentFailure).toBeNull(); + }); + + it('should fail the connection when ACK_FAILURE receives FAILURE', () => { + connection = connect('bolt://localhost'); + + connection._ackFailureIfNeeded(); + + connection._handleMessage(FAILURE_MESSAGE); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + }); + + it('should fail the connection when ACK_FAILURE receives RECORD', () => { + connection = connect('bolt://localhost'); + + connection._ackFailureIfNeeded(); + + connection._handleMessage(RECORD_MESSAGE); + expect(connection._isBroken).toBeTruthy(); + expect(connection.isOpen()).toBeFalsy(); + }); + function packedHandshakeMessage() { const result = alloc(4); result.putInt32(0, 1); @@ -301,4 +404,15 @@ describe('connector', () => { }); } + function testQueueingOfObserversWithBrokenConnection(connectionAction) { + connection = connect('bolt://localhost'); + + connection._handleMessage(ILLEGAL_MESSAGE); + expect(connection.isOpen()).toBeFalsy(); + + expect(connection._pendingObservers.length).toEqual(0); + connectionAction(connection); + expect(connection._pendingObservers.length).toEqual(0); + } + }); diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 95c85ffbb..7816866aa 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -53,6 +53,11 @@ export default class FakeConnection { this.resetInvoked++; } + resetAndFlush() { + this.resetInvoked++; + return Promise.resolve(); + } + sync() { this.syncInvoked++; } @@ -79,9 +84,7 @@ export default class FakeConnection { } isReleasedTimes(times) { - return this.resetInvoked === times && - this.syncInvoked === times && - this.releaseInvoked === times; + return this.resetInvoked === times && this.releaseInvoked === times; } withServerVersion(version) { diff --git a/test/resources/boltstub/reset_error.script b/test/resources/boltstub/reset_error.script new file mode 100644 index 000000000..e335d5c59 --- /dev/null +++ b/test/resources/boltstub/reset_error.script @@ -0,0 +1,9 @@ +!: AUTO INIT + +C: RUN "RETURN 42 AS answer" {} + PULL_ALL +S: SUCCESS {"fields": ["answer"]} + RECORD [42] + SUCCESS {} +C: RESET +S: FAILURE {"code": "Neo.TransientError.General.DatabaseUnavailable", "message": "Unable to reset"} diff --git a/test/v1/direct.driver.boltkit.test.js b/test/v1/direct.driver.boltkit.test.js index 76d743c8e..c443c1a31 100644 --- a/test/v1/direct.driver.boltkit.test.js +++ b/test/v1/direct.driver.boltkit.test.js @@ -274,4 +274,34 @@ describe('direct driver with stub server', () => { }); }); + it('should close connection when RESET fails', done => { + if (!boltStub.supported) { + done(); + return; + } + + const server = boltStub.start('./test/resources/boltstub/reset_error.script', 9001); + + boltStub.run(() => { + const driver = boltStub.newDriver('bolt://127.0.0.1:9001'); + const session = driver.session(); + + session.run('RETURN 42 AS answer').then(result => { + const records = result.records; + expect(records.length).toEqual(1); + expect(records[0].get(0).toNumber()).toEqual(42); + session.close(() => { + + expect(driver._pool._pools['127.0.0.1:9001'].length).toEqual(0); + driver.close(); + server.exit(code => { + expect(code).toEqual(0); + done(); + }); + + }); + }).catch(error => done.fail(error)); + }); + }); + });