Skip to content

Expose server info in result summary from sessions and transactions #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ class Driver {
*/
_createConnection(url, release) {
let sessionId = this._sessionIdGenerator++;
let streamObserver = new _ConnectionStreamObserver(this);
let conn = connect(url, this._config);
let streamObserver = new _ConnectionStreamObserver(this, conn);
conn.initialize(this._userAgent, this._token, streamObserver);
conn._id = sessionId;
conn._release = () => release(url, conn);
Expand Down Expand Up @@ -172,9 +172,10 @@ class Driver {

/** Internal stream observer used for connection state */
class _ConnectionStreamObserver extends StreamObserver {
constructor(driver) {
constructor(driver, conn) {
super();
this._driver = driver;
this._conn = conn;
this._hasFailed = false;
}

Expand All @@ -192,6 +193,9 @@ class _ConnectionStreamObserver extends StreamObserver {
if (this._driver.onCompleted) {
this._driver.onCompleted(message);
}
if (this._conn && message && message.server) {
this._conn.setServerVersion(message.server);
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class Connection {
* to the next pending observer.
*/
this.url = url;
this.server = {address: url};
this._pendingObservers = [];
this._currentObserver = undefined;
this._ch = channel;
Expand Down Expand Up @@ -453,6 +454,10 @@ class Connection {
_packable(value) {
return this._packer.packable(value, (err) => this._handleFatalError(err));
}

setServerVersion(version) {
this.server.version = version;
}
}

/**
Expand All @@ -464,6 +469,10 @@ class Connection {
*/
function connect( url, config = {}) {
let Ch = config.channel || Channel;
const host = parseHost(url);
const port = parsePort(url) || 7687;
const completeUrl = host + ':' + port;

return new Connection( new Ch({
host: parseHost(url),
port: parsePort(url) || 7687,
Expand All @@ -473,7 +482,7 @@ function connect( url, config = {}) {
trust : config.trust || (hasFeature("trust_all_certificates") ? "TRUST_ALL_CERTIFICATES" : "TRUST_CUSTOM_CA_SIGNED_CERTIFICATES"),
trustedCertificates : config.trustedCertificates || [],
knownHosts : config.knownHosts
}), url);
}), completeUrl);
}

export {
Expand Down
19 changes: 19 additions & 0 deletions src/v1/result-summary.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ResultSummary {
this.plan = metadata.plan || metadata.profile ? new Plan(metadata.plan || metadata.profile) : false;
this.profile = metadata.profile ? new ProfiledPlan(metadata.profile) : false;
this.notifications = this._buildNotifications(metadata.notifications);
this.server = new ServerInfo(metadata.server);
this.resultConsumedAfter = metadata.result_consumed_after;
this.resultAvailableAfter = metadata.result_available_after;
}
Expand Down Expand Up @@ -254,6 +255,24 @@ class Notification {
}
}

/**
* Class for exposing server info from a result.
* @access public
*/
class ServerInfo {
/**
* Create a ServerInfo instance
* @constructor
* @param {Object} serverMeta - Object with serverMeta data
*/
constructor(serverMeta) {
if (serverMeta) {
this.address = serverMeta.address;
this.version = serverMeta.version;
}
}
}

const statementType = {
READ_ONLY: 'r',
READ_WRITE: 'rw',
Expand Down
3 changes: 2 additions & 1 deletion src/v1/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ class _RunObserver extends StreamObserver {
}

meta() {
return this._meta;
const serverMeta = {server: this._conn.server};
return Object.assign({}, this._meta, serverMeta);
}
}

Expand Down
7 changes: 6 additions & 1 deletion src/v1/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ class _TransactionStreamObserver extends StreamObserver {
this._tx._onBookmark(bookmark);
}
}

serverMeta() {
const serverMeta = {server: this._conn.server};
return serverMeta;
}
}

/** internal state machine of the transaction*/
Expand All @@ -155,7 +160,7 @@ let _states = {
conn.sync();
}).catch(observer.onError);

return new Result( observer, statement, parameters );
return new Result( observer, statement, parameters, () => observer.serverMeta() );
}
},

Expand Down
12 changes: 12 additions & 0 deletions test/resources/boltkit/read_server_with_version.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
!: AUTO RESET
!: AUTO PULL_ALL

C: INIT "neo4j-javascript/0.0.0-dev" {"credentials": "neo4j", "scheme": "basic", "principal": "neo4j"}
S: SUCCESS {"server": "TheReadServerV1"}
C: RUN "MATCH (n) RETURN n.name" {}
PULL_ALL
S: SUCCESS {"fields": ["n.name"]}
RECORD ["Bob"]
RECORD ["Alice"]
RECORD ["Tina"]
SUCCESS {}
9 changes: 9 additions & 0 deletions test/resources/boltkit/write_server_with_version.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
!: AUTO RESET
!: AUTO PULL_ALL

C: INIT "neo4j-javascript/0.0.0-dev" {"credentials": "neo4j", "scheme": "basic", "principal": "neo4j"}
S: SUCCESS {"server": "TheWriteServerV1"}
C: RUN "CREATE (n {name:'Bob'})" {}
PULL_ALL
S: SUCCESS {}
SUCCESS {}
117 changes: 113 additions & 4 deletions test/v1/routing.driver.boltkit.it.js
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ describe('routing driver ', function () {
});
});
});

it('should re-use connections', function (done) {
if (!boltkit.BoltKitSupport) {
done();
Expand Down Expand Up @@ -674,10 +675,118 @@ describe('routing driver ', function () {
});
});

function newDriver(url) {
// BoltKit currently does not support encryption, create driver with encryption turned off
return neo4j.driver(url, neo4j.auth.basic("neo4j", "neo4j"), {
encrypted: "ENCRYPTION_OFF"
it('should expose server info in cluster', function (done) {
if (!boltkit.BoltKitSupport) {
done();
return;
}

// Given
const kit = new boltkit.BoltKit();
const routingServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
const writeServer = kit.start('./test/resources/boltkit/write_server_with_version.script', 9007);
const readServer = kit.start('./test/resources/boltkit/read_server_with_version.script', 9005);

kit.run(function () {
const driver = newDriver("bolt+routing://127.0.0.1:9001");
// When
const readSession = driver.session(neo4j.session.READ);
readSession.run('MATCH (n) RETURN n.name').then(readResult => {
const writeSession = driver.session(neo4j.session.WRITE);
writeSession.run("CREATE (n {name:'Bob'})").then(writeResult => {
const readServerInfo = readResult.summary.server;
const writeServerInfo = writeResult.summary.server;

readSession.close();
writeSession.close();
driver.close();

routingServer.exit(routingServerExitCode => {
writeServer.exit(writeServerExitCode => {
readServer.exit(readServerExitCode => {

expect(readServerInfo.address).toBe('127.0.0.1:9005');
expect(readServerInfo.version).toBe('TheReadServerV1');

expect(writeServerInfo.address).toBe('127.0.0.1:9007');
expect(writeServerInfo.version).toBe('TheWriteServerV1');

expect(routingServerExitCode).toEqual(0);
expect(writeServerExitCode).toEqual(0);
expect(readServerExitCode).toEqual(0);

done();
});
});
});
})
});
});
});

it('should expose server info in cluster using observer', function (done) {
if (!boltkit.BoltKitSupport) {
done();
return;
}

// Given
const kit = new boltkit.BoltKit();
const routingServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001);
const writeServer = kit.start('./test/resources/boltkit/write_server_with_version.script', 9007);
const readServer = kit.start('./test/resources/boltkit/read_server_with_version.script', 9005);

kit.run(function () {
const driver = newDriver("bolt+routing://127.0.0.1:9001");
// When
const readSession = driver.session(neo4j.session.READ);
readSession.run('MATCH (n) RETURN n.name').subscribe({
onNext: () => {
},
onError: () => {
},
onCompleted: readSummary => {
const writeSession = driver.session(neo4j.session.WRITE);
writeSession.run("CREATE (n {name:'Bob'})").subscribe({
onNext: () => {
},
onError: () => {
},
onCompleted: writeSummary => {
readSession.close();
writeSession.close();
driver.close();

routingServer.exit(function (routingServerExitCode) {
writeServer.exit(function (writeServerExitCode) {
readServer.exit(function (readServerExitCode) {

expect(readSummary.server.address).toBe('127.0.0.1:9005');
expect(readSummary.server.version).toBe('TheReadServerV1');

expect(writeSummary.server.address).toBe('127.0.0.1:9007');
expect(writeSummary.server.version).toBe('TheWriteServerV1');

expect(routingServerExitCode).toEqual(0);
expect(writeServerExitCode).toEqual(0);
expect(readServerExitCode).toEqual(0);

done();
});
});
});
}
})
}
});
});
});

function newDriver(url) {
// BoltKit currently does not support encryption, create driver with encryption turned off
return neo4j.driver(url, neo4j.auth.basic("neo4j", "neo4j"), {
encrypted: "ENCRYPTION_OFF"
});
}

});
23 changes: 23 additions & 0 deletions test/v1/session.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,29 @@ describe('session', function () {
});
});

it('should expose server info on successful query', function (done) {
//lazy way of checking the version number
//if server has been set we know it is at least
//3.1 (todo actually parse the version string)
if (!server) {
done();
return;
}

// Given
var statement = 'RETURN 1';

// When & Then
session.run(statement)
.then(function (result) {
var sum = result.summary;
expect(sum.server).toBeDefined();
expect(sum.server.address).toEqual('localhost:7687');
expect(sum.server.version).toBeDefined();
done();
});
});

it('should expose execution time information when using 3.1 and onwards', function (done) {

//lazy way of checking the version number
Expand Down
59 changes: 59 additions & 0 deletions test/v1/transaction.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,67 @@ describe('transaction', function() {
});
});

it('should expose server info on successful query', function (done) {
if (neo4jVersionOlderThan31(done)) {
return;
}

// Given
const statement = 'RETURN 1';

// When & Then
const tx = session.beginTransaction();
tx.run(statement)
.then(result => {
const sum = result.summary;
expect(sum.server).toBeDefined();
expect(sum.server.address).toEqual('localhost:7687');
expect(sum.server.version).toBeDefined();
});
tx.commit().then(done);
});

it('should expose server info on successful query using observer', function (done) {
if (neo4jVersionOlderThan31(done)) {
return;
}

// Given
const statement = 'RETURN 1';

// When & Then
const tx = session.beginTransaction();
tx.run(statement)
.subscribe({
onNext: record => {
},
onError: error => {
},
onCompleted: summary => {
const server = summary.server;

expect(server).toBeDefined();
expect(server.address).toEqual('localhost:7687');
expect(server.version).toBeDefined();

done();
}
});
});

function expectSyntaxError(error) {
const code = error.fields[0].code;
expect(code).toBe('Neo.ClientError.Statement.SyntaxError');
}

function neo4jVersionOlderThan31(done) {
//lazy way of checking the version number
//if server has been set we know it is at least
//3.1 (todo actually parse the version string)
if (!server) {
done();
return true;
}
return false;
}
});