diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 9d19de16e..4ee258c10 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -24,6 +24,7 @@ import {alloc} from './buf'; import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types'; import {newError} from './../error'; import ChannelConfig from './ch-config'; +import StreamObserver from './stream-observer'; let Channel; if( NodeChannel.available ) { @@ -272,7 +273,7 @@ class Connection { * failing, and the connection getting ejected from the session pool. * * @param err an error object, forwarded to all current and future subscribers - * @private + * @protected */ _handleFatalError( err ) { this._isBroken = true; @@ -289,6 +290,12 @@ class Connection { } _handleMessage( msg ) { + if (this._isBroken) { + // ignore all incoming messages when this connection is broken. all previously pending observers failed + // with the fatal error. all future observers will fail with same fatal error. + return; + } + const payload = msg.fields[0]; switch( msg.signature ) { @@ -301,7 +308,7 @@ class Connection { try { this._currentObserver.onCompleted( payload ); } finally { - this._currentObserver = this._pendingObservers.shift(); + this._updateCurrentObserver(); } break; case FAILURE: @@ -310,7 +317,7 @@ class Connection { this._currentFailure = newError(payload.message, payload.code); this._currentObserver.onError( this._currentFailure ); } finally { - this._currentObserver = this._pendingObservers.shift(); + this._updateCurrentObserver(); // Things are now broken. Pending observers will get FAILURE messages routed until // We are done handling this failure. if( !this._isHandlingFailure ) { @@ -340,7 +347,7 @@ class Connection { else if(this._currentObserver.onError) this._currentObserver.onError(payload); } finally { - this._currentObserver = this._pendingObservers.shift(); + this._updateCurrentObserver(); } break; default: @@ -351,7 +358,8 @@ class Connection { /** Queue an INIT-message to be sent to the database */ initialize( clientName, token, observer ) { log("C", "INIT", clientName, token); - this._queueObserver(observer); + const initObserver = new InitObserver(this, observer); + this._queueObserver(initObserver); this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)], (err) => this._handleFatalError(err) ); this._chunker.messageBoundary(); @@ -437,6 +445,14 @@ class Connection { } } + /** + * Pop next pending observer form the list of observers and make it current observer. + * @protected + */ + _updateCurrentObserver() { + this._currentObserver = this._pendingObservers.shift(); + } + /** * Synchronize - flush all queued outgoing messages and route their responses * to their respective handlers. @@ -489,6 +505,42 @@ function connect(url, config = {}, connectionErrorCode = null) { return new Connection( new Ch(channelConfig), completeUrl); } +/** + * Observer that wraps user-defined observer for INIT message and handles initialization failures. Connection is + * closed by the server if processing of INIT message fails so this observer will handle initialization failure + * as a fatal error. + */ +class InitObserver extends StreamObserver { + + /** + * @constructor + * @param {Connection} connection the connection used to send INIT message. + * @param {StreamObserver} originalObserver the observer to wrap and delegate calls to. + */ + constructor(connection, originalObserver) { + super(); + this._connection = connection; + this._originalObserver = originalObserver || NO_OP_OBSERVER; + } + + onNext(record) { + this._originalObserver.onNext(record); + } + + onError(error) { + this._connection._updateCurrentObserver(); // make sure this same observer will not be called again + try { + this._originalObserver.onError(error); + } finally { + this._connection._handleFatalError(error); + } + } + + onCompleted(metaData) { + this._originalObserver.onCompleted(metaData); + } +} + export { connect, parseScheme, diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index d94da81df..afabb8144 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -23,6 +23,7 @@ import Integer, {int} from '../integer'; const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; +const UNAUTHORIZED_CODE = 'Neo.ClientError.Security.Unauthorized'; export default class GetServersUtil { @@ -35,10 +36,14 @@ export default class GetServersUtil { // throw when getServers procedure not found because this is clearly a configuration issue throw newError('Server ' + routerAddress + ' could not perform routing. ' + 'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE); + } else if (error.code === UNAUTHORIZED_CODE) { + // auth error is a sign of a configuration issue, rediscovery should not proceed + throw error; + } else { + // return nothing when failed to connect because code higher in the callstack is still able to retry with a + // different session towards a different router + return null; } - // return nothing when failed to connect because code higher in the callstack is still able to retry with a - // different session towards a different router - return null; }); } diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index 3610a0b56..ac8ac8a42 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -117,6 +117,54 @@ describe('connector', () => { channel.onmessage(packedFailureMessage(errorCode, errorMessage)); }); + it('should notify provided observer when connection initialization completes', done => { + const connection = connect('bolt://localhost'); + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onCompleted: metaData => { + expect(connection.isOpen()).toBeTruthy(); + expect(metaData).toBeDefined(); + done(); + }, + }); + }); + + it('should notify provided observer when connection initialization fails', done => { + const connection = connect('bolt://localhost:7474'); // wrong port + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onError: error => { + expect(connection.isOpen()).toBeFalsy(); + expect(error).toBeDefined(); + done(); + }, + }); + }); + + it('should fail all new observers after initialization error', done => { + const connection = connect('bolt://localhost:7474'); // wrong port + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onError: initialError => { + expect(initialError).toBeDefined(); + + connection.run('RETURN 1', {}, { + onError: error1 => { + expect(error1).toEqual(initialError); + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onError: error2 => { + expect(error2).toEqual(initialError); + + done(); + } + }); + } + }); + }, + }); + }); + function packedHandshakeMessage() { const result = alloc(4); result.putInt32(0, 1); diff --git a/test/resources/boltkit/failed_auth.script b/test/resources/boltkit/failed_auth.script new file mode 100644 index 000000000..824575601 --- /dev/null +++ b/test/resources/boltkit/failed_auth.script @@ -0,0 +1,6 @@ +!: AUTO RESET +!: AUTO PULL_ALL + +C: INIT "neo4j-javascript/0.0.0-dev" {"credentials": "neo4j", "scheme": "basic", "principal": "neo4j"} +S: FAILURE {"code": "Neo.ClientError.Security.Unauthorized", "message": "Some server auth error message"} +S: diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index 6795af7cf..5923754a7 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -80,7 +80,7 @@ describe('driver', () => { it('should fail early on wrong credentials', done => { // Given - driver = neo4j.driver("bolt://localhost", neo4j.auth.basic("neo4j", "who would use such a password")); + driver = neo4j.driver("bolt://localhost", wrongCredentials()); // Expect driver.onError = err => { @@ -93,6 +93,16 @@ describe('driver', () => { startNewTransaction(driver); }); + it('should fail queries on wrong credentials', done => { + driver = neo4j.driver("bolt://localhost", wrongCredentials()); + + const session = driver.session(); + session.run('RETURN 1').catch(error => { + expect(error.code).toEqual('Neo.ClientError.Security.Unauthorized'); + done(); + }); + }); + it('should indicate success early on correct credentials', done => { // Given driver = neo4j.driver("bolt://localhost", sharedNeo4j.authToken); @@ -207,4 +217,8 @@ describe('driver', () => { expect(session.beginTransaction()).toBeDefined(); } + function wrongCredentials() { + return neo4j.auth.basic('neo4j', 'who would use such a password'); + } + }); diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index a16892db8..8ae02e08b 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -1549,6 +1549,33 @@ describe('routing driver', () => { }); }); + it('should fail rediscovery on auth error', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/failed_auth.script', 9010); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9010'); + const session = driver.session(); + session.run('RETURN 1').catch(error => { + expect(error.code).toEqual('Neo.ClientError.Security.Unauthorized'); + expect(error.message).toEqual('Some server auth error message'); + + session.close(() => { + driver.close(); + router.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + function moveNextDateNow30SecondsForward() { const currentTime = Date.now(); hijackNextDateNowCall(currentTime + 30 * 1000 + 1);