From 5a33a589bf343970fb3d564d11337715b96ba2b6 Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 2 May 2017 15:21:13 +0200 Subject: [PATCH 1/3] Propagate authentication errors Authentication errors happen during connection initialization when INIT message is send. Server validates provided credentials and then responds with either SUCCESS if credentials are fine or with FAILURE and closes the connection when credentials are wrong. Previously auth errors were not propagated to external observers (including those defined by user's Result promise). This means auth errors were only propagated to `Driver.onError` callback but all other places received `ServiceUnavailable` or `SessionExpired` because of closed connection. This commit makes driver treat INIT error as fatal and corresponding connection as broken. Initialization error is memorized and propagated to all message observers. So they see specific failure reason and not generic `ServiceUnavailable` \ `SessionExpired`. --- src/v1/internal/connector.js | 51 ++++++++++++++++++++++- src/v1/internal/get-servers-util.js | 11 +++-- test/internal/connector.test.js | 24 +++++++++++ test/resources/boltkit/failed_auth.script | 6 +++ test/v1/driver.test.js | 16 ++++++- test/v1/routing.driver.boltkit.it.js | 27 ++++++++++++ 6 files changed, 130 insertions(+), 5 deletions(-) create mode 100644 test/resources/boltkit/failed_auth.script diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 9d19de16e..255bbda18 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -288,6 +288,20 @@ class Connection { } } + /** + * Mark this connection as failed because processing of INIT message failed and server will close the connection. + * Initialization failure is a fatal error for the connection. + * @param {Neo4jError} error the initialization error. + * @param {StreamObserver} initObserver the initialization observer that noticed the failure. + * @protected + */ + _initializationFailed(error, initObserver) { + if (this._currentObserver === initObserver) { + this._currentObserver = null; // init observer detected the failure and should not be notified again + } + this._handleFatalError(error); + } + _handleMessage( msg ) { const payload = msg.fields[0]; @@ -351,7 +365,8 @@ class Connection { /** Queue an INIT-message to be sent to the database */ initialize( clientName, token, observer ) { log("C", "INIT", clientName, token); - this._queueObserver(observer); + const initObserver = new InitObserver(this, observer); + this._queueObserver(initObserver); this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)], (err) => this._handleFatalError(err) ); this._chunker.messageBoundary(); @@ -489,6 +504,40 @@ function connect(url, config = {}, connectionErrorCode = null) { return new Connection( new Ch(channelConfig), completeUrl); } +/** + * Observer that wraps user-defined observer for INIT message and handles initialization failures. Connection is + * closed by the server if processing of INIT message fails so this observer will handle initialization failure + * as a fatal error. + */ +class InitObserver { + + /** + * @constructor + * @param {Connection} connection the connection used to send INIT message. + * @param {StreamObserver} originalObserver the observer to wrap and delegate calls to. + */ + constructor(connection, originalObserver) { + this._connection = connection; + this._originalObserver = originalObserver || NO_OP_OBSERVER; + } + + onNext(record) { + this._originalObserver.onNext(record); + } + + onError(error) { + try { + this._originalObserver.onError(error); + } finally { + this._connection._initializationFailed(error, this); + } + } + + onCompleted(metaData) { + this._originalObserver.onCompleted(metaData); + } +} + export { connect, parseScheme, diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index d94da81df..afabb8144 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -23,6 +23,7 @@ import Integer, {int} from '../integer'; const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; +const UNAUTHORIZED_CODE = 'Neo.ClientError.Security.Unauthorized'; export default class GetServersUtil { @@ -35,10 +36,14 @@ export default class GetServersUtil { // throw when getServers procedure not found because this is clearly a configuration issue throw newError('Server ' + routerAddress + ' could not perform routing. ' + 'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE); + } else if (error.code === UNAUTHORIZED_CODE) { + // auth error is a sign of a configuration issue, rediscovery should not proceed + throw error; + } else { + // return nothing when failed to connect because code higher in the callstack is still able to retry with a + // different session towards a different router + return null; } - // return nothing when failed to connect because code higher in the callstack is still able to retry with a - // different session towards a different router - return null; }); } diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index 3610a0b56..2b4675220 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -117,6 +117,30 @@ describe('connector', () => { channel.onmessage(packedFailureMessage(errorCode, errorMessage)); }); + it('should notify provided observer when connection initialization completes', done => { + const connection = connect('bolt://localhost'); + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onCompleted: metaData => { + expect(connection.isOpen()).toBeTruthy(); + expect(metaData).toBeDefined(); + done(); + }, + }); + }); + + it('should notify provided observer when connection initialization fails', done => { + const connection = connect('bolt://localhost:7474'); // wrong port + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onError: error => { + expect(connection.isOpen()).toBeFalsy(); + expect(error).toBeDefined(); + done(); + }, + }); + }); + function packedHandshakeMessage() { const result = alloc(4); result.putInt32(0, 1); diff --git a/test/resources/boltkit/failed_auth.script b/test/resources/boltkit/failed_auth.script new file mode 100644 index 000000000..824575601 --- /dev/null +++ b/test/resources/boltkit/failed_auth.script @@ -0,0 +1,6 @@ +!: AUTO RESET +!: AUTO PULL_ALL + +C: INIT "neo4j-javascript/0.0.0-dev" {"credentials": "neo4j", "scheme": "basic", "principal": "neo4j"} +S: FAILURE {"code": "Neo.ClientError.Security.Unauthorized", "message": "Some server auth error message"} +S: diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index 6795af7cf..5923754a7 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -80,7 +80,7 @@ describe('driver', () => { it('should fail early on wrong credentials', done => { // Given - driver = neo4j.driver("bolt://localhost", neo4j.auth.basic("neo4j", "who would use such a password")); + driver = neo4j.driver("bolt://localhost", wrongCredentials()); // Expect driver.onError = err => { @@ -93,6 +93,16 @@ describe('driver', () => { startNewTransaction(driver); }); + it('should fail queries on wrong credentials', done => { + driver = neo4j.driver("bolt://localhost", wrongCredentials()); + + const session = driver.session(); + session.run('RETURN 1').catch(error => { + expect(error.code).toEqual('Neo.ClientError.Security.Unauthorized'); + done(); + }); + }); + it('should indicate success early on correct credentials', done => { // Given driver = neo4j.driver("bolt://localhost", sharedNeo4j.authToken); @@ -207,4 +217,8 @@ describe('driver', () => { expect(session.beginTransaction()).toBeDefined(); } + function wrongCredentials() { + return neo4j.auth.basic('neo4j', 'who would use such a password'); + } + }); diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index a16892db8..8ae02e08b 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -1549,6 +1549,33 @@ describe('routing driver', () => { }); }); + it('should fail rediscovery on auth error', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/failed_auth.script', 9010); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9010'); + const session = driver.session(); + session.run('RETURN 1').catch(error => { + expect(error.code).toEqual('Neo.ClientError.Security.Unauthorized'); + expect(error.message).toEqual('Some server auth error message'); + + session.close(() => { + driver.close(); + router.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + function moveNextDateNow30SecondsForward() { const currentTime = Date.now(); hijackNextDateNowCall(currentTime + 30 * 1000 + 1); From 805292fbe0fea767ea4a6e7092b474bbe92d2c7e Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 2 May 2017 16:11:38 +0200 Subject: [PATCH 2/3] Temporary enable bolt message logging To investigate test failure on TeamCity. --- src/v1/internal/connector.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 255bbda18..667ef8f6a 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -57,7 +57,7 @@ UNBOUND_RELATIONSHIP = 0x72, PATH = 0x50, //sent before version negotiation MAGIC_PREAMBLE = 0x6060B017, -DEBUG = false; +DEBUG = true; let URLREGEX = new RegExp([ "([^/]+//)?", // scheme From 6294e3631588ac6ab4603fb2a8c4b22a170dacd9 Mon Sep 17 00:00:00 2001 From: lutovich Date: Thu, 4 May 2017 12:16:28 +0200 Subject: [PATCH 3/3] Make sure INIT error is handled correctly with 3.0 server Neo4j 3.1 closes connection on INIT error. This made it possible to handle such errors in driver as fatal because we do not expect to receive anything else from the server. However Neo4j 3.0 does not close the connection after INIT failure and thus connection might receive more incoming messages. This commit makes sure all incoming messages are ignored after a fatal error. --- src/v1/internal/connector.js | 43 ++++++++++++++++++--------------- test/internal/connector.test.js | 24 ++++++++++++++++++ 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 667ef8f6a..4ee258c10 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -24,6 +24,7 @@ import {alloc} from './buf'; import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types'; import {newError} from './../error'; import ChannelConfig from './ch-config'; +import StreamObserver from './stream-observer'; let Channel; if( NodeChannel.available ) { @@ -57,7 +58,7 @@ UNBOUND_RELATIONSHIP = 0x72, PATH = 0x50, //sent before version negotiation MAGIC_PREAMBLE = 0x6060B017, -DEBUG = true; +DEBUG = false; let URLREGEX = new RegExp([ "([^/]+//)?", // scheme @@ -272,7 +273,7 @@ class Connection { * failing, and the connection getting ejected from the session pool. * * @param err an error object, forwarded to all current and future subscribers - * @private + * @protected */ _handleFatalError( err ) { this._isBroken = true; @@ -288,21 +289,13 @@ class Connection { } } - /** - * Mark this connection as failed because processing of INIT message failed and server will close the connection. - * Initialization failure is a fatal error for the connection. - * @param {Neo4jError} error the initialization error. - * @param {StreamObserver} initObserver the initialization observer that noticed the failure. - * @protected - */ - _initializationFailed(error, initObserver) { - if (this._currentObserver === initObserver) { - this._currentObserver = null; // init observer detected the failure and should not be notified again + _handleMessage( msg ) { + if (this._isBroken) { + // ignore all incoming messages when this connection is broken. all previously pending observers failed + // with the fatal error. all future observers will fail with same fatal error. + return; } - this._handleFatalError(error); - } - _handleMessage( msg ) { const payload = msg.fields[0]; switch( msg.signature ) { @@ -315,7 +308,7 @@ class Connection { try { this._currentObserver.onCompleted( payload ); } finally { - this._currentObserver = this._pendingObservers.shift(); + this._updateCurrentObserver(); } break; case FAILURE: @@ -324,7 +317,7 @@ class Connection { this._currentFailure = newError(payload.message, payload.code); this._currentObserver.onError( this._currentFailure ); } finally { - this._currentObserver = this._pendingObservers.shift(); + this._updateCurrentObserver(); // Things are now broken. Pending observers will get FAILURE messages routed until // We are done handling this failure. if( !this._isHandlingFailure ) { @@ -354,7 +347,7 @@ class Connection { else if(this._currentObserver.onError) this._currentObserver.onError(payload); } finally { - this._currentObserver = this._pendingObservers.shift(); + this._updateCurrentObserver(); } break; default: @@ -452,6 +445,14 @@ class Connection { } } + /** + * Pop next pending observer form the list of observers and make it current observer. + * @protected + */ + _updateCurrentObserver() { + this._currentObserver = this._pendingObservers.shift(); + } + /** * Synchronize - flush all queued outgoing messages and route their responses * to their respective handlers. @@ -509,7 +510,7 @@ function connect(url, config = {}, connectionErrorCode = null) { * closed by the server if processing of INIT message fails so this observer will handle initialization failure * as a fatal error. */ -class InitObserver { +class InitObserver extends StreamObserver { /** * @constructor @@ -517,6 +518,7 @@ class InitObserver { * @param {StreamObserver} originalObserver the observer to wrap and delegate calls to. */ constructor(connection, originalObserver) { + super(); this._connection = connection; this._originalObserver = originalObserver || NO_OP_OBSERVER; } @@ -526,10 +528,11 @@ class InitObserver { } onError(error) { + this._connection._updateCurrentObserver(); // make sure this same observer will not be called again try { this._originalObserver.onError(error); } finally { - this._connection._initializationFailed(error, this); + this._connection._handleFatalError(error); } } diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index 2b4675220..ac8ac8a42 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -141,6 +141,30 @@ describe('connector', () => { }); }); + it('should fail all new observers after initialization error', done => { + const connection = connect('bolt://localhost:7474'); // wrong port + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onError: initialError => { + expect(initialError).toBeDefined(); + + connection.run('RETURN 1', {}, { + onError: error1 => { + expect(error1).toEqual(initialError); + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onError: error2 => { + expect(error2).toEqual(initialError); + + done(); + } + }); + } + }); + }, + }); + }); + function packedHandshakeMessage() { const result = alloc(4); result.putInt32(0, 1);