Skip to content

Unpack failure messages to Neo4jErrors #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/v1/internal/buf.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
*(via Buffer API).
*/

import {newError} from './../error';
let _node = require("buffer");
/**
* Common base with default implementation for most buffer methods.
Expand Down Expand Up @@ -551,7 +550,7 @@ class NodeBuffer extends BaseBuffer {
val.position + bytesToCopy );
val.position += bytesToCopy;
} else {
throw newError("Copying not yet implemented.");
super.putBytes(position, val);
}
};

Expand Down
40 changes: 22 additions & 18 deletions src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
*/
import WebSocketChannel from './ch-websocket';
import NodeChannel from './ch-node';
import {Dechunker, Chunker} from "./chunking";
import {Chunker, Dechunker} from './chunking';
import hasFeature from './features';
import {Packer, Unpacker} from './packstream';
import {alloc} from './buf';
import {Node, Relationship, UnboundRelationship, Path, PathSegment} from '../graph-types'
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
import {newError} from './../error';

let Channel;
Expand Down Expand Up @@ -201,7 +201,9 @@ class Connection {
this._chunker = new Chunker( channel );
this._packer = new Packer( this._chunker );
this._unpacker = new Unpacker();

this._isHandlingFailure = false;
this._currentFailure = null;

// Set to true on fatal errors, to get this out of session pool.
this._isBroken = false;
Expand Down Expand Up @@ -288,54 +290,56 @@ class Connection {
}

_handleMessage( msg ) {
const payload = msg.fields[0];

switch( msg.signature ) {
case RECORD:
log("S", "RECORD", msg.fields[0]);
this._currentObserver.onNext( msg.fields[0] );
log("S", "RECORD", msg);
this._currentObserver.onNext( payload );
break;
case SUCCESS:
log("S", "SUCCESS", msg.fields[0]);
log("S", "SUCCESS", msg);
try {
this._currentObserver.onCompleted( msg.fields[0] );
this._currentObserver.onCompleted( payload );
} finally {
this._currentObserver = this._pendingObservers.shift();
}
break;
case FAILURE:
log("S", "FAILURE", msg);
try {
this._currentObserver.onError( msg );
this._errorMsg = msg;
this._currentFailure = newError(payload.message, payload.code);
this._currentObserver.onError( this._currentFailure );
} finally {
this._currentObserver = this._pendingObservers.shift();
// Things are now broken. Pending observers will get FAILURE messages routed until
// We are done handling this failure.
if( !this._isHandlingFailure ) {
this._isHandlingFailure = true;
let self = this;

// isHandlingFailure was false, meaning this is the first failure message
// we see from this failure. We may see several others, one for each message
// we had "optimistically" already sent after whatever it was that failed.
// We only want to and need to ACK the first one, which is why we are tracking
// this _isHandlingFailure thing.
this._ackFailure({
onNext: NO_OP,
onError: NO_OP,
onCompleted: () => {
self._isHandlingFailure = false;
}
onNext: NO_OP,
onError: NO_OP,
onCompleted: () => {
this._isHandlingFailure = false;
this._currentFailure = null;
}
});
}
}
break;
case IGNORED:
log("S", "IGNORED");
log("S", "IGNORED", msg);
try {
if (this._errorMsg && this._currentObserver.onError)
this._currentObserver.onError(this._errorMsg);
if (this._currentFailure && this._currentObserver.onError)
this._currentObserver.onError(this._currentFailure);
else if(this._currentObserver.onError)
this._currentObserver.onError(msg);
this._currentObserver.onError(payload);
} finally {
this._currentObserver = this._pendingObservers.shift();
}
Expand Down
14 changes: 1 addition & 13 deletions src/v1/internal/get-servers-util.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export default class GetServersUtil {
session.close();
return result.records;
}).catch(error => {
if (this._isProcedureNotFoundError(error)) {
if (error.code === PROCEDURE_NOT_FOUND_CODE) {
// throw when getServers procedure not found because this is clearly a configuration issue
throw newError('Server ' + routerAddress + ' could not perform routing. ' +
'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE);
Expand Down Expand Up @@ -92,16 +92,4 @@ export default class GetServersUtil {
PROTOCOL_ERROR);
}
}

_isProcedureNotFoundError(error) {
let errorCode = error.code;
if (!errorCode) {
try {
errorCode = error.fields[0].code;
} catch (e) {
errorCode = 'UNKNOWN';
}
}
return errorCode === PROCEDURE_NOT_FOUND_CODE;
}
}
32 changes: 6 additions & 26 deletions src/v1/routing-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,37 +36,17 @@ class RoutingDriver extends Driver {
}

_createSession(connectionPromise, cb) {
return new RoutingSession(connectionPromise, cb, (err, conn) => {
let code = err.code;
let msg = err.message;
if (!code) {
try {
code = err.fields[0].code;
} catch (e) {
code = 'UNKNOWN';
}
}
if (!msg) {
try {
msg = err.fields[0].message;
} catch (e) {
msg = 'Unknown failure occurred';
}
}
//just to simplify later error handling
err.code = code;
err.message = msg;

if (code === SERVICE_UNAVAILABLE || code === SESSION_EXPIRED) {
return new RoutingSession(connectionPromise, cb, (error, conn) => {
if (error.code === SERVICE_UNAVAILABLE || error.code === SESSION_EXPIRED) {
if (conn) {
this._forget(conn.url)
} else {
connectionPromise.then((conn) => {
this._forget(conn.url);
}).catch(() => {/*ignore*/});
}
return err;
} else if (code === 'Neo.ClientError.Cluster.NotALeader') {
return error;
} else if (error.code === 'Neo.ClientError.Cluster.NotALeader') {
let url = 'UNKNOWN';
if (conn) {
url = conn.url;
Expand All @@ -76,9 +56,9 @@ class RoutingDriver extends Driver {
this._routingTable.forgetWriter(conn.url);
}).catch(() => {/*ignore*/});
}
return newError("No longer possible to write to server at " + url, SESSION_EXPIRED);
return newError('No longer possible to write to server at ' + url, SESSION_EXPIRED);
} else {
return err;
return error;
}
});
}
Expand Down
115 changes: 81 additions & 34 deletions test/internal/connector.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,75 +16,78 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var DummyChannel = require('../../lib/v1/internal/ch-dummy.js');
var connect = require("../../lib/v1/internal/connector.js").connect;

describe('connector', function() {
import * as DummyChannel from '../../src/v1/internal/ch-dummy';
import {connect, Connection} from '../../src/v1/internal/connector';
import {Packer} from '../../src/v1/internal/packstream';
import {Chunker} from '../../src/v1/internal/chunking';
import {alloc} from '../../src/v1/internal/buf';
import {Neo4jError} from '../../src/v1/error';

it('should read/write basic messages', function(done) {
describe('connector', () => {

it('should read/write basic messages', done => {
// Given
var conn = connect("bolt://localhost")
const conn = connect("bolt://localhost");

// When
conn.initialize( "mydriver/0.0.0", {scheme: "basic", principal: "neo4j", credentials: "neo4j"}, {
onCompleted: function( msg ) {
expect( msg ).not.toBeNull();
conn.initialize("mydriver/0.0.0", {scheme: "basic", principal: "neo4j", credentials: "neo4j"}, {
onCompleted: msg => {
expect(msg).not.toBeNull();
conn.close();
done();
},
onError: function(err) {
console.log(err);
}
onError: console.log
});
conn.sync();

});
it('should retrieve stream', function(done) {

it('should retrieve stream', done => {
// Given
var conn = connect("bolt://localhost")
const conn = connect("bolt://localhost");

// When
var records = [];
conn.initialize( "mydriver/0.0.0", {scheme: "basic", principal: "neo4j", credentials: "neo4j"} );
conn.run( "RETURN 1.0", {} );
conn.pullAll( {
onNext: function( record ) {
records.push( record );
const records = [];
conn.initialize("mydriver/0.0.0", {scheme: "basic", principal: "neo4j", credentials: "neo4j"});
conn.run("RETURN 1.0", {});
conn.pullAll({
onNext: record => {
records.push(record);
},
onCompleted: function( tail ) {
expect( records[0][0] ).toBe( 1 );
onCompleted: () => {
expect(records[0][0]).toBe(1);
conn.close();
done();
}
});
conn.sync();
});

it('should use DummyChannel to read what gets written', function(done) {
it('should use DummyChannel to read what gets written', done => {
// Given
var observer = DummyChannel.observer;
var conn = connect("bolt://localhost", {channel:DummyChannel.channel});
const observer = DummyChannel.observer;
const conn = connect("bolt://localhost", {channel: DummyChannel.channel});

// When
var records = [];
conn.initialize( "mydriver/0.0.0", {scheme: "basic", principal: "neo4j", credentials: "neo4j"} );
conn.run( "RETURN 1", {} );
conn.initialize("mydriver/0.0.0", {scheme: "basic", principal: "neo4j", credentials: "neo4j"});
conn.run("RETURN 1", {});
conn.sync();
expect( observer.instance.toHex() ).toBe( '60 60 b0 17 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 41 b2 01 8e 6d 79 64 72 69 76 65 72 2f 30 2e 30 2e 30 a3 86 73 63 68 65 6d 65 85 62 61 73 69 63 89 70 72 69 6e 63 69 70 61 6c 85 6e 65 6f 34 6a 8b 63 72 65 64 65 6e 74 69 61 6c 73 85 6e 65 6f 34 6a 00 00 00 0c b2 10 88 52 45 54 55 52 4e 20 31 a0 00 00 ' );
expect(observer.instance.toHex()).toBe('60 60 b0 17 00 00 00 01 00 00 00 00 00 00 00 00 00 00 00 00 00 41 b2 01 8e 6d 79 64 72 69 76 65 72 2f 30 2e 30 2e 30 a3 86 73 63 68 65 6d 65 85 62 61 73 69 63 89 70 72 69 6e 63 69 70 61 6c 85 6e 65 6f 34 6a 8b 63 72 65 64 65 6e 74 69 61 6c 73 85 6e 65 6f 34 6a 00 00 00 0c b2 10 88 52 45 54 55 52 4e 20 31 a0 00 00 ');
done();
});

it('should provide error message when connecting to http-port', function(done) {
it('should provide error message when connecting to http-port', done => {
// Given
var conn = connect("bolt://localhost:7474", {encrypted:false});
const conn = connect("bolt://localhost:7474", {encrypted: false});

// When
conn.initialize( "mydriver/0.0.0", {scheme: "basic", principal: "neo4j", credentials: "neo4j"}, {
onCompleted: function( msg ) {
conn.initialize("mydriver/0.0.0", {scheme: "basic", principal: "neo4j", credentials: "neo4j"}, {
onCompleted: msg => {
},
onError: function(err) {
onError: err => {
//only node gets the pretty error message
if( require('../../lib/v1/internal/ch-node.js').available ) {
if (require('../../lib/v1/internal/ch-node.js').available) {
expect(err.message).toBe("Server responded HTTP. Make sure you are not trying to connect to the http endpoint " +
"(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)");
}
Expand All @@ -95,4 +98,48 @@ describe('connector', function() {

});

it('should convert failure messages to errors', done => {
const channel = new DummyChannel.channel;
const connection = new Connection(channel, 'bolt://localhost');

const errorCode = 'Neo.ClientError.Schema.ConstraintValidationFailed';
const errorMessage = 'Node 0 already exists with label User and property "email"=[john@doe.com]';

connection._queueObserver({
onError: error => {
expectNeo4jError(error, errorCode, errorMessage);
done();
}
});

channel.onmessage(packedHandshakeMessage());
channel.onmessage(packedFailureMessage(errorCode, errorMessage));
});

function packedHandshakeMessage() {
const result = alloc(4);
result.putInt32(0, 1);
result.reset();
return result;
}

function packedFailureMessage(code, message) {
const channel = new DummyChannel.channel;
const chunker = new Chunker(channel);
const packer = new Packer(chunker);
packer.packStruct(0x7F, [packer.packable({code: code, message: message})]);
chunker.messageBoundary();
chunker.flush();
const data = channel.toBuffer();
const result = alloc(data.length);
result.putBytes(0, data);
return result;
}

function expectNeo4jError(error, expectedCode, expectedMessage) {
expect(() => {
throw error;
}).toThrow(new Neo4jError(expectedMessage, expectedCode));
}

});
2 changes: 1 addition & 1 deletion test/v1/driver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ describe('driver', function() {
// Expect
driver.onError = function (err) {
//the error message is different whether in browser or node
expect(err.fields[0].code).toEqual('Neo.ClientError.Security.Unauthorized');
expect(err.code).toEqual('Neo.ClientError.Security.Unauthorized');
done();
};

Expand Down
2 changes: 1 addition & 1 deletion test/v1/examples.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ describe('examples', function() {
// end::handle-cypher-error[]

testResultPromise.then(function(loggedError){
expect(loggedError.fields[0].code).toBe( "Neo.ClientError.Statement.SyntaxError" );
expect(loggedError.code).toBe( 'Neo.ClientError.Statement.SyntaxError' );
done();
});
});
Expand Down
8 changes: 4 additions & 4 deletions test/v1/session.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ describe('session', function () {
it('should call observers onError on error ', function (done) {

// When & Then
session.run("RETURN 1 AS").subscribe({
session.run('RETURN 1 AS').subscribe({
onError: function (error) {
expect(error.fields.length).toBe(1);
expect(error.code).toEqual('Neo.ClientError.Statement.SyntaxError');
done();
}
});
Expand Down Expand Up @@ -139,9 +139,9 @@ describe('session', function () {

it('should expose basic run/catch ', function (done) {
// When & Then
session.run("RETURN 1 AS").catch(
session.run('RETURN 1 AS').catch(
function (error) {
expect(error.fields.length).toBe(1);
expect(error.code).toEqual('Neo.ClientError.Statement.SyntaxError');
done();
}
)
Expand Down
Loading