From cd67c581f0a66000b4700119e6ea29bf40dd501d Mon Sep 17 00:00:00 2001 From: Zhen Date: Wed, 19 Apr 2017 18:02:10 +0200 Subject: [PATCH 1/5] Adding url parsing support of routing context --- src/v1/index.js | 13 ++-- src/v1/internal/connection-providers.js | 5 +- src/v1/internal/connector.js | 22 ++++++- src/v1/internal/get-servers-util.js | 43 +++++++++---- src/v1/internal/rediscovery.js | 2 +- src/v1/internal/server-version-util.js | 73 ++++++++++++++++++++++ src/v1/routing-driver.js | 5 +- test/internal/connection-providers.test.js | 4 +- test/internal/get-servers-util.test.js | 7 ++- test/internal/host-name-resolvers.test.js | 33 +++++++++- 10 files changed, 177 insertions(+), 30 deletions(-) create mode 100644 src/v1/internal/server-version-util.js diff --git a/src/v1/index.js b/src/v1/index.js index 8a55f10d1..e43383d57 100644 --- a/src/v1/index.js +++ b/src/v1/index.js @@ -26,9 +26,8 @@ import Record from './record'; import {Driver, READ, WRITE} from './driver'; import RoutingDriver from './routing-driver'; import VERSION from '../version'; -import {parseScheme, parseUrl} from './internal/connector'; -import {assertString} from './internal/util'; - +import {parseRoutingContext, parseScheme, parseUrl} from './internal/connector'; +import {assertString, isEmptyObjectOrNull} from './internal/util'; const auth = { basic: (username, password, realm = undefined) => { @@ -129,13 +128,17 @@ const USER_AGENT = "neo4j-javascript/" + VERSION; function driver(url, authToken, config = {}) { assertString(url, 'Bolt URL'); const scheme = parseScheme(url); + const routingContext = parseRoutingContext(url); if (scheme === "bolt+routing://") { - return new RoutingDriver(parseUrl(url), USER_AGENT, authToken, config); + return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config); } else if (scheme === "bolt://") { + if(!isEmptyObjectOrNull(routingContext)) + { + throw new Error("Routing context are not supported with scheme 'bolt'. Given URI: '" + url + "'"); + } return new Driver(parseUrl(url), USER_AGENT, authToken, config); } else { throw new Error("Unknown scheme: " + scheme); - } } diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index a2e5e94e7..5b6599772 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -25,6 +25,7 @@ import RoutingTable from './routing-table'; import Rediscovery from './rediscovery'; import hasFeature from './features'; import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers'; +import GetServersUtil from './get-servers-util'; class ConnectionProvider { @@ -61,11 +62,11 @@ export class DirectConnectionProvider extends ConnectionProvider { export class LoadBalancer extends ConnectionProvider { - constructor(address, connectionPool, driverOnErrorCallback) { + constructor(address, routingContext, connectionPool, driverOnErrorCallback) { super(); this._seedRouter = address; this._routingTable = new RoutingTable(new RoundRobinArray([this._seedRouter])); - this._rediscovery = new Rediscovery(); + this._rediscovery = new Rediscovery(new GetServersUtil(routingContext)); this._connectionPool = connectionPool; this._driverOnErrorCallback = driverOnErrorCallback; this._hostNameResolver = LoadBalancer._createHostNameResolver(); diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 9d19de16e..096063c51 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -60,10 +60,12 @@ MAGIC_PREAMBLE = 0x6060B017, DEBUG = false; let URLREGEX = new RegExp([ - "([^/]+//)?", // scheme + "([^/]+//)?", // scheme "(([^:/?#]*)", // hostname "(?::([0-9]+))?)", // port (optional) - ".*"].join("")); // everything else + "([^?]*)?", // everything else + "(\\?(.+))?" // query +].join("")); function parseScheme( url ) { let scheme = url.match(URLREGEX)[1] || ''; @@ -82,6 +84,21 @@ function parsePort( url ) { return url.match( URLREGEX )[4]; } +function parseRoutingContext(url) { + const query = url.match(URLREGEX)[7] || ''; + const map = {}; + if (query.length !== 0) { + query.split("&").forEach(val => { + const keyValue = val.split("="); + if (keyValue.length !== 2) { + throw new Error("Invalid parameters: '" + keyValue + "' in url '" + url + "'."); + } + map[keyValue[0]] = keyValue[1]; + }); + } + return map; +} + /** * Very rudimentary log handling, should probably be replaced by something proper at some point. * @param actor the part that sent the message, 'S' for server and 'C' for client @@ -495,5 +512,6 @@ export { parseUrl, parseHost, parsePort, + parseRoutingContext, Connection } diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index d94da81df..7d10faba9 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -20,25 +20,44 @@ import RoundRobinArray from './round-robin-array'; import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error'; import Integer, {int} from '../integer'; +import {ServerVersion, VERSION3_2} from './server-version-util' -const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers'; +const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers'; +const GET_ROUTING_TABLE_PARAM = "context"; +const CALL_GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable({" + + GET_ROUTING_TABLE_PARAM + "})"; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; export default class GetServersUtil { + constructor(routingContext={}) { + this._routingContext = routingContext; + } + callGetServers(session, routerAddress) { - return session.run(PROCEDURE_CALL).then(result => { - session.close(); - return result.records; - }).catch(error => { - if (error.code === PROCEDURE_NOT_FOUND_CODE) { - // throw when getServers procedure not found because this is clearly a configuration issue - throw newError('Server ' + routerAddress + ' could not perform routing. ' + - 'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE); + session.run("RETURN 1").then(result=>{ + let statement = {text:CALL_GET_SERVERS}; + + if(ServerVersion.fromString(result.summary.server.version).compare(VERSION3_2)>=0) + { + statement = { + text:CALL_GET_ROUTING_TABLE, + parameters:{GET_ROUTING_TABLE_PARAM: this._routingContext}}; } - // return nothing when failed to connect because code higher in the callstack is still able to retry with a - // different session towards a different router - return null; + + return session.run(statement).then(result => { + session.close(); + return result.records; + }).catch(error => { + if (error.code === PROCEDURE_NOT_FOUND_CODE) { + // throw when getServers procedure not found because this is clearly a configuration issue + throw newError('Server ' + routerAddress + ' could not perform routing. ' + + 'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE); + } + // return nothing when failed to connect because code higher in the callstack is still able to retry with a + // different session towards a different router + return null; + }); }); } diff --git a/src/v1/internal/rediscovery.js b/src/v1/internal/rediscovery.js index 08fb60bf0..a07977fde 100644 --- a/src/v1/internal/rediscovery.js +++ b/src/v1/internal/rediscovery.js @@ -24,7 +24,7 @@ import {newError, PROTOCOL_ERROR} from "../error"; export default class Rediscovery { constructor(getServersUtil) { - this._getServersUtil = getServersUtil || new GetServersUtil(); + this._getServersUtil = getServersUtil; } lookupRoutingTableOnRouter(session, routerAddress) { diff --git a/src/v1/internal/server-version-util.js b/src/v1/internal/server-version-util.js new file mode 100644 index 000000000..81dc980d5 --- /dev/null +++ b/src/v1/internal/server-version-util.js @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +let SERVER_VERSION_REGEX = new RegExp("(Neo4j/)?(\\d+)\\.(\\d+)(?:\\.)?(\\d*)(\\.|-|\\+)?([0-9A-Za-z-.]*)?"); + +class ServerVersion { + constructor(major, minor, patch) { + this._major = major; + this._minor = minor; + this._patch = patch; + } + + static fromString(versionStr) { + if (!versionStr) { + return new ServerVersion(3, 0, 0); + } + else { + const version = versionStr.match(SERVER_VERSION_REGEX); + return new ServerVersion(version[2], version[3], version[4]); + } + } + + compare(other) { + const version = this._parseToNumber(); + const otherVersion = other._parseToNumber(); + + if (version == otherVersion) { + return 0; + } + if (version > otherVersion) { + return 1; + } + else { + return -1; + } + } + + _parseToNumber() { + let value = 0; + value += parseInt(this._major) * 100 + parseInt(this._minor) * 10; + if (!isEmptyObjectOrNull(this._patch)) { + value += parseInt(this._patch); + } + return value; + } +} + +const VERSION3_2 = new ServerVersion(3, 2, 0); + +export{ + ServerVersion, + VERSION3_2 +} + + + + diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index adc4548b5..3e895cf90 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -27,12 +27,13 @@ import {LoadBalancer} from './internal/connection-providers'; */ class RoutingDriver extends Driver { - constructor(url, userAgent, token = {}, config = {}) { + constructor(url, routingContext, userAgent, token = {}, config = {}) { super(url, userAgent, token, RoutingDriver._validateConfig(config)); + this._routingContext = routingContext; } _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { - return new LoadBalancer(address, connectionPool, driverOnErrorCallback); + return new LoadBalancer(address, this._routingContext, connectionPool, driverOnErrorCallback); } _createSession(mode, connectionProvider, bookmark, config) { diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 5a53812a8..7b78b5e64 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -135,7 +135,7 @@ describe('LoadBalancer', () => { }); it('initializes routing table with the given router', () => { - const loadBalancer = new LoadBalancer('server-ABC', newPool(), NO_OP_DRIVER_CALLBACK); + const loadBalancer = new LoadBalancer('server-ABC', {}, newPool(), NO_OP_DRIVER_CALLBACK); expectRoutingTable(loadBalancer, ['server-ABC'], @@ -1040,7 +1040,7 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved, expirationTime = Integer.MAX_VALUE, routerToRoutingTable = {}, connectionPool = null) { - const loadBalancer = new LoadBalancer(seedRouter, connectionPool || newPool(), NO_OP_DRIVER_CALLBACK); + const loadBalancer = new LoadBalancer(seedRouter, {}, connectionPool || newPool(), NO_OP_DRIVER_CALLBACK); loadBalancer._routingTable = new RoutingTable( new RoundRobinArray(routers), new RoundRobinArray(readers), diff --git a/test/internal/get-servers-util.test.js b/test/internal/get-servers-util.test.js index 95855cf5a..e2323faf4 100644 --- a/test/internal/get-servers-util.test.js +++ b/test/internal/get-servers-util.test.js @@ -245,9 +245,10 @@ describe('get-servers-util', () => { class FakeSession { - constructor(runResponse) { - this._runResponse = runResponse; + constructor(runResponses) { + this._runResponses = runResponses; this._closed = false; + this._runCounter = 0; } static successful(result) { @@ -259,7 +260,7 @@ describe('get-servers-util', () => { } run() { - return this._runResponse; + return this._runResponses[this._runCounter ++]; } close() { diff --git a/test/internal/host-name-resolvers.test.js b/test/internal/host-name-resolvers.test.js index 2ca2a8149..a8a497dc3 100644 --- a/test/internal/host-name-resolvers.test.js +++ b/test/internal/host-name-resolvers.test.js @@ -19,7 +19,38 @@ import {DnsHostNameResolver, DummyHostNameResolver} from '../../src/v1/internal/host-name-resolvers'; import hasFeature from '../../src/v1/internal/features'; -import {parseHost, parsePort, parseScheme} from '../../src/v1/internal/connector'; +import {parseHost, parsePort, parseScheme, parseRoutingContext} from '../../src/v1/internal/connector'; + +describe('RoutingContextParser', ()=>{ + + it('should parse routing context', done => { + const url = "bolt://localhost:7687/cat?name=molly&age=1&color=white"; + const context = parseRoutingContext(url); + expect(context).toEqual({name:"molly", age:"1", color:"white"}); + + done(); + }); + + it('should return empty routing context', done =>{ + const url1 = "bolt://localhost:7687/cat?"; + const context1 = parseRoutingContext(url1); + expect(context1).toEqual({}); + + const url2 = "bolt://localhost:7687/lalala"; + const context2 = parseRoutingContext(url2); + expect(context2).toEqual({}); + + done(); + }); + + it('should error for unmatched pair', done=>{ + const url = "bolt://localhost?cat"; + expect(()=>parseRoutingContext(url)).toThrow( + new Error("Invalid parameters: 'cat' in url 'bolt://localhost?cat'.")); + + done(); + }); +}); describe('DummyHostNameResolver', () => { From 46ad5e6f426dad9f82b58af6f7f6ec143c8b494d Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 21 Apr 2017 10:58:50 +0200 Subject: [PATCH 2/5] Improve server version handling Fixed parsing, comparison and added tests. --- src/v1/internal/server-version-util.js | 73 --------------- src/v1/internal/server-version.js | 103 +++++++++++++++++++++ test/internal/server-version.test.js | 118 +++++++++++++++++++++++++ 3 files changed, 221 insertions(+), 73 deletions(-) delete mode 100644 src/v1/internal/server-version-util.js create mode 100644 src/v1/internal/server-version.js create mode 100644 test/internal/server-version.test.js diff --git a/src/v1/internal/server-version-util.js b/src/v1/internal/server-version-util.js deleted file mode 100644 index 81dc980d5..000000000 --- a/src/v1/internal/server-version-util.js +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Copyright (c) 2002-2017 "Neo Technology,"," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -let SERVER_VERSION_REGEX = new RegExp("(Neo4j/)?(\\d+)\\.(\\d+)(?:\\.)?(\\d*)(\\.|-|\\+)?([0-9A-Za-z-.]*)?"); - -class ServerVersion { - constructor(major, minor, patch) { - this._major = major; - this._minor = minor; - this._patch = patch; - } - - static fromString(versionStr) { - if (!versionStr) { - return new ServerVersion(3, 0, 0); - } - else { - const version = versionStr.match(SERVER_VERSION_REGEX); - return new ServerVersion(version[2], version[3], version[4]); - } - } - - compare(other) { - const version = this._parseToNumber(); - const otherVersion = other._parseToNumber(); - - if (version == otherVersion) { - return 0; - } - if (version > otherVersion) { - return 1; - } - else { - return -1; - } - } - - _parseToNumber() { - let value = 0; - value += parseInt(this._major) * 100 + parseInt(this._minor) * 10; - if (!isEmptyObjectOrNull(this._patch)) { - value += parseInt(this._patch); - } - return value; - } -} - -const VERSION3_2 = new ServerVersion(3, 2, 0); - -export{ - ServerVersion, - VERSION3_2 -} - - - - diff --git a/src/v1/internal/server-version.js b/src/v1/internal/server-version.js new file mode 100644 index 000000000..1c25d71fc --- /dev/null +++ b/src/v1/internal/server-version.js @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {assertString} from './util'; + +const SERVER_VERSION_REGEX = new RegExp('^(Neo4j/)?(\\d+)\\.(\\d+)(?:\\.)?(\\d*)(\\.|-|\\+)?([0-9A-Za-z-.]*)?$'); + +class ServerVersion { + + /** + * @constructor + * @param {number} major the major version number. + * @param {number} minor the minor version number. + * @param {number} patch the patch version number. + */ + constructor(major, minor, patch) { + this.major = major; + this.minor = minor; + this.patch = patch; + } + + /** + * Parse given string to a {@link ServerVersion} object. + * @param versionStr the string to parse. + * @return {ServerVersion} version for the given string. + * @throws Error if given string can't be parsed. + */ + static fromString(versionStr) { + if (!versionStr) { + return new ServerVersion(3, 0, 0); + } + + assertString(versionStr, 'Neo4j version string'); + + const version = versionStr.match(SERVER_VERSION_REGEX); + if (!version) { + throw new Error(`Unparsable Neo4j version: ${versionStr}`); + } + + const major = parseIntStrict(version[2]); + const minor = parseIntStrict(version[3]); + const patch = parseIntStrict(version[4] || 0); + + return new ServerVersion(major, minor, patch); + } + + /** + * Compare this version to the given one. + * @param {ServerVersion} other the version to compare with. + * @return {number} value 0 if this version is the same as the given one, value less then 0 when this version + * was released earlier than the given one and value greater then 0 when this version was released after + * than the given one. + */ + compareTo(other) { + let result = compareInts(this.major, other.major); + if (result === 0) { + result = compareInts(this.minor, other.minor); + if (result === 0) { + result = compareInts(this.patch, other.patch); + } + } + return result; + } +} + +function parseIntStrict(str, name) { + const value = parseInt(str); + if (!value && value !== 0) { + throw new Error(`Unparsable number ${name}: '${str}'`); + } + return value; +} + +function compareInts(x, y) { + return (x < y) ? -1 : ((x === y) ? 0 : 1); +} + +const VERSION_3_2_0 = new ServerVersion(3, 2, 0); + +export { + ServerVersion, + VERSION_3_2_0 +}; + + + + diff --git a/test/internal/server-version.test.js b/test/internal/server-version.test.js new file mode 100644 index 000000000..72141e4f9 --- /dev/null +++ b/test/internal/server-version.test.js @@ -0,0 +1,118 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {ServerVersion, VERSION_3_2_0} from '../../src/v1/internal/server-version'; + +describe('ServerVersion', () => { + + it('should construct with correct values', () => { + verifyVersion(new ServerVersion(2, 3, 10), 2, 3, 10); + verifyVersion(new ServerVersion(3, 2, 0), 3, 2, 0); + verifyVersion(new ServerVersion(1, 9, 12), 1, 9, 12); + }); + + it('should define correct 3.2.0 constant', () => { + verifyVersion(VERSION_3_2_0, 3, 2, 0); + }); + + it('should parse "undefined" strings to 3.0.0 for backwards compatibility reasons', () => { + verifyVersion(parse(null), 3, 0, 0); + verifyVersion(parse(undefined), 3, 0, 0); + verifyVersion(parse(''), 3, 0, 0); + }); + + it('should fail to parse object, array and function', () => { + expect(() => parse({})).toThrowError(TypeError); + expect(() => parse([])).toThrowError(TypeError); + expect(() => parse(() => { + })).toThrowError(TypeError); + }); + + it('should fail to parse illegal strings', () => { + expect(() => parse('Cat')).toThrow(); + expect(() => parse('Dog')).toThrow(); + expect(() => parse('Neo4j')).toThrow(); + expect(() => parse('Neo4j/')).toThrow(); + expect(() => parse('Not-Neo4j/3.1.0')).toThrow(); + expect(() => parse('Neo4j/5')).toThrow(); + expect(() => parse('Neo4j/3.')).toThrow(); + expect(() => parse('Neo4j/1.A')).toThrow(); + expect(() => parse('Neo4j/1.Hello')).toThrow(); + }); + + it('should parse valid version strings', () => { + verifyVersion(parse('Neo4j/3.2.1'), 3, 2, 1); + verifyVersion(parse('Neo4j/1.9.10'), 1, 9, 10); + verifyVersion(parse('Neo4j/7.77.777'), 7, 77, 777); + + verifyVersion(parse('Neo4j/3.10.0-GA'), 3, 10, 0); + verifyVersion(parse('Neo4j/2.5.5-RC01'), 2, 5, 5); + verifyVersion(parse('Neo4j/3.1.1-SNAPSHOT'), 3, 1, 1); + verifyVersion(parse('Neo4j/2.0.0-M09'), 2, 0, 0); + + verifyVersion(parse('Neo4j/3.2'), 3, 2, 0); + verifyVersion(parse('Neo4j/1.5'), 1, 5, 0); + verifyVersion(parse('Neo4j/42.42'), 42, 42, 0); + + verifyVersion(parse('Neo4j/2.2-GA'), 2, 2, 0); + verifyVersion(parse('Neo4j/3.0-RC01'), 3, 0, 0); + verifyVersion(parse('Neo4j/2.3-SNAPSHOT'), 2, 3, 0); + verifyVersion(parse('Neo4j/2.2-M09'), 2, 2, 0); + }); + + it('should compare equal versions', () => { + expect(new ServerVersion(3, 1, 0).compareTo(new ServerVersion(3, 1, 0))).toEqual(0); + expect(new ServerVersion(1, 9, 12).compareTo(new ServerVersion(1, 9, 12))).toEqual(0); + expect(new ServerVersion(2, 3, 8).compareTo(new ServerVersion(2, 3, 8))).toEqual(0); + }); + + it('should compare correctly by major', () => { + expect(new ServerVersion(3, 1, 0).compareTo(new ServerVersion(2, 1, 0))).toBeGreaterThan(0); + expect(new ServerVersion(2, 1, 0).compareTo(new ServerVersion(3, 1, 0))).toBeLessThan(0); + + expect(new ServerVersion(3, 1, 4).compareTo(new ServerVersion(1, 9, 10))).toBeGreaterThan(0); + expect(new ServerVersion(1, 5, 42).compareTo(new ServerVersion(10, 10, 43))).toBeLessThan(0); + }); + + it('should compare correctly by minor', () => { + expect(new ServerVersion(3, 3, 0).compareTo(new ServerVersion(3, 1, 0))).toBeGreaterThan(0); + expect(new ServerVersion(1, 3, 5).compareTo(new ServerVersion(1, 9, 5))).toBeLessThan(0); + + expect(new ServerVersion(3, 9, 5).compareTo(new ServerVersion(3, 1, 2))).toBeGreaterThan(0); + expect(new ServerVersion(1, 5, 42).compareTo(new ServerVersion(1, 10, 11))).toBeLessThan(0); + }); + + it('should compare correctly by patch', () => { + expect(new ServerVersion(3, 3, 6).compareTo(new ServerVersion(3, 3, 5))).toBeGreaterThan(0); + expect(new ServerVersion(1, 8, 2).compareTo(new ServerVersion(1, 8, 8))).toBeLessThan(0); + expect(new ServerVersion(9, 9, 9).compareTo(new ServerVersion(9, 9, 0))).toBeGreaterThan(0); + expect(new ServerVersion(3, 3, 3).compareTo(new ServerVersion(3, 3, 42))).toBeLessThan(0); + }); + +}); + +function verifyVersion(serverVersion, expectedMajor, expectedMinor, expectedPatch) { + expect(serverVersion.major).toEqual(expectedMajor); + expect(serverVersion.minor).toEqual(expectedMinor); + expect(serverVersion.patch).toEqual(expectedPatch); +} + +function parse(string) { + return ServerVersion.fromString(string); +} From 4f8c75be1c07dce429aba091273ad6c30a5f5b10 Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 21 Apr 2017 11:55:14 +0200 Subject: [PATCH 3/5] Improve routing context parsing Move all URL parsing functions out of `connector` to `util` because they are not really related to network connection and Bolt messages. Prohibit invalid keys and values in routing context. Add tests for URL parsing. --- src/v1/index.js | 3 +- src/v1/internal/connector.js | 53 +-------- src/v1/internal/host-name-resolvers.js | 2 +- src/v1/internal/util.js | 71 +++++++++++- test/internal/host-name-resolvers.test.js | 33 +----- test/internal/util.test.js | 130 +++++++++++++++++++++- 6 files changed, 202 insertions(+), 90 deletions(-) diff --git a/src/v1/index.js b/src/v1/index.js index e43383d57..c694a6c41 100644 --- a/src/v1/index.js +++ b/src/v1/index.js @@ -26,8 +26,7 @@ import Record from './record'; import {Driver, READ, WRITE} from './driver'; import RoutingDriver from './routing-driver'; import VERSION from '../version'; -import {parseRoutingContext, parseScheme, parseUrl} from './internal/connector'; -import {assertString, isEmptyObjectOrNull} from './internal/util'; +import {assertString, isEmptyObjectOrNull, parseRoutingContext, parseScheme, parseUrl} from './internal/util'; const auth = { basic: (username, password, realm = undefined) => { diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 096063c51..24b20b048 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -16,6 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + import WebSocketChannel from './ch-websocket'; import NodeChannel from './ch-node'; import {Chunker, Dechunker} from './chunking'; @@ -24,6 +25,7 @@ import {alloc} from './buf'; import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types'; import {newError} from './../error'; import ChannelConfig from './ch-config'; +import {parseHost, parsePort} from './util'; let Channel; if( NodeChannel.available ) { @@ -59,46 +61,6 @@ PATH = 0x50, MAGIC_PREAMBLE = 0x6060B017, DEBUG = false; -let URLREGEX = new RegExp([ - "([^/]+//)?", // scheme - "(([^:/?#]*)", // hostname - "(?::([0-9]+))?)", // port (optional) - "([^?]*)?", // everything else - "(\\?(.+))?" // query -].join("")); - -function parseScheme( url ) { - let scheme = url.match(URLREGEX)[1] || ''; - return scheme.toLowerCase(); -} - -function parseUrl(url) { - return url.match( URLREGEX )[2]; -} - -function parseHost( url ) { - return url.match( URLREGEX )[3]; -} - -function parsePort( url ) { - return url.match( URLREGEX )[4]; -} - -function parseRoutingContext(url) { - const query = url.match(URLREGEX)[7] || ''; - const map = {}; - if (query.length !== 0) { - query.split("&").forEach(val => { - const keyValue = val.split("="); - if (keyValue.length !== 2) { - throw new Error("Invalid parameters: '" + keyValue + "' in url '" + url + "'."); - } - map[keyValue[0]] = keyValue[1]; - }); - } - return map; -} - /** * Very rudimentary log handling, should probably be replaced by something proper at some point. * @param actor the part that sent the message, 'S' for server and 'C' for client @@ -507,11 +469,6 @@ function connect(url, config = {}, connectionErrorCode = null) { } export { - connect, - parseScheme, - parseUrl, - parseHost, - parsePort, - parseRoutingContext, - Connection -} + connect, + Connection +}; diff --git a/src/v1/internal/host-name-resolvers.js b/src/v1/internal/host-name-resolvers.js index e3b328f65..50b8c9f8d 100644 --- a/src/v1/internal/host-name-resolvers.js +++ b/src/v1/internal/host-name-resolvers.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import {parseHost, parsePort} from './connector'; +import {parseHost, parsePort} from './util'; class HostNameResolver { diff --git a/src/v1/internal/util.js b/src/v1/internal/util.js index 0e7efdc89..e7fd5c66e 100644 --- a/src/v1/internal/util.js +++ b/src/v1/internal/util.js @@ -20,8 +20,16 @@ const ENCRYPTION_ON = "ENCRYPTION_ON"; const ENCRYPTION_OFF = "ENCRYPTION_OFF"; +const URL_REGEX = new RegExp([ + '([^/]+//)?', // scheme + '(([^:/?#]*)', // hostname + '(?::([0-9]+))?)', // port (optional) + '([^?]*)?', // everything else + '(\\?(.+))?' // query +].join('')); + function isEmptyObjectOrNull(obj) { - if (isNull(obj)) { + if (obj === null) { return true; } @@ -38,10 +46,6 @@ function isEmptyObjectOrNull(obj) { return true; } -function isNull(obj) { - return obj === null; -} - function isObject(obj) { const type = typeof obj; return type === 'function' || type === 'object' && Boolean(obj); @@ -58,9 +62,66 @@ function isString(str) { return Object.prototype.toString.call(str) === '[object String]'; } +function parseScheme(url) { + assertString(url, 'URL'); + const scheme = url.match(URL_REGEX)[1] || ''; + return scheme.toLowerCase(); +} + +function parseUrl(url) { + assertString(url, 'URL'); + return url.match(URL_REGEX)[2]; +} + +function parseHost(url) { + assertString(url, 'URL'); + return url.match(URL_REGEX)[3]; +} + +function parsePort(url) { + assertString(url, 'URL'); + return url.match(URL_REGEX)[4]; +} + +function parseRoutingContext(url) { + const query = url.match(URL_REGEX)[7] || ''; + const context = {}; + if (query) { + query.split('&').forEach(pair => { + const keyValue = pair.split('='); + if (keyValue.length !== 2) { + throw new Error('Invalid parameters: \'' + keyValue + '\' in URL \'' + url + '\'.'); + } + + const key = trimAndVerify(keyValue[0], 'key', url); + const value = trimAndVerify(keyValue[1], 'value', url); + + if (context[key]) { + throw new Error(`Duplicated query parameters with key '${key}' in URL '${url}'`); + } + + context[key] = value; + }); + } + return context; +} + +function trimAndVerify(string, name, url) { + const result = string.trim(); + if (!result) { + throw new Error(`Illegal empty ${name} in URL query '${url}'`); + } + return result; +} + export { isEmptyObjectOrNull, assertString, + parseScheme, + parseUrl, + parseHost, + parsePort, + parseRoutingContext, ENCRYPTION_ON, ENCRYPTION_OFF } diff --git a/test/internal/host-name-resolvers.test.js b/test/internal/host-name-resolvers.test.js index a8a497dc3..c798a6754 100644 --- a/test/internal/host-name-resolvers.test.js +++ b/test/internal/host-name-resolvers.test.js @@ -19,38 +19,7 @@ import {DnsHostNameResolver, DummyHostNameResolver} from '../../src/v1/internal/host-name-resolvers'; import hasFeature from '../../src/v1/internal/features'; -import {parseHost, parsePort, parseScheme, parseRoutingContext} from '../../src/v1/internal/connector'; - -describe('RoutingContextParser', ()=>{ - - it('should parse routing context', done => { - const url = "bolt://localhost:7687/cat?name=molly&age=1&color=white"; - const context = parseRoutingContext(url); - expect(context).toEqual({name:"molly", age:"1", color:"white"}); - - done(); - }); - - it('should return empty routing context', done =>{ - const url1 = "bolt://localhost:7687/cat?"; - const context1 = parseRoutingContext(url1); - expect(context1).toEqual({}); - - const url2 = "bolt://localhost:7687/lalala"; - const context2 = parseRoutingContext(url2); - expect(context2).toEqual({}); - - done(); - }); - - it('should error for unmatched pair', done=>{ - const url = "bolt://localhost?cat"; - expect(()=>parseRoutingContext(url)).toThrow( - new Error("Invalid parameters: 'cat' in url 'bolt://localhost?cat'.")); - - done(); - }); -}); +import {parseHost, parsePort, parseScheme} from '../../src/v1/internal/util'; describe('DummyHostNameResolver', () => { diff --git a/test/internal/util.test.js b/test/internal/util.test.js index bc557e107..2a1680cfd 100644 --- a/test/internal/util.test.js +++ b/test/internal/util.test.js @@ -17,9 +17,9 @@ * limitations under the License. */ -const util = require('../../lib/v1/internal/util.js'); +import * as util from '../../src/v1/internal/util'; -describe('util', () => { +fdescribe('util', () => { it('should check empty objects', () => { expect(util.isEmptyObjectOrNull(null)).toBeTruthy(); @@ -55,6 +55,112 @@ describe('util', () => { verifyInvalidString(console.log); }); + it('should parse scheme', () => { + verifyScheme('bolt://', 'bolt://localhost'); + verifyScheme('bolt://', 'bolt://localhost:7687'); + verifyScheme('bolt://', 'bolt://neo4j.com'); + verifyScheme('bolt://', 'bolt://neo4j.com:80'); + + verifyScheme('bolt+routing://', 'bolt+routing://127.0.0.1'); + verifyScheme('bolt+routing://', 'bolt+routing://127.0.0.1:7687'); + verifyScheme('bolt+routing://', 'bolt+routing://neo4j.com'); + verifyScheme('bolt+routing://', 'bolt+routing://neo4j.com:80'); + + verifyScheme('wss://', 'wss://server.com'); + verifyScheme('wss://', 'wss://server.com:7687'); + verifyScheme('wss://', 'wss://1.1.1.1'); + verifyScheme('wss://', 'wss://8.8.8.8:80'); + + verifyScheme('', 'invalid url'); + verifyScheme('', 'localhost:7676'); + verifyScheme('', '127.0.0.1'); + }); + + it('should fail to parse scheme from non-string argument', () => { + expect(() => util.parseScheme({})).toThrowError(TypeError); + expect(() => util.parseScheme(['bolt://localhost:2020'])).toThrowError(TypeError); + expect(() => util.parseScheme(() => 'bolt://localhost:8888')).toThrowError(TypeError); + }); + + it('should parse url', () => { + verifyUrl('localhost', 'bolt://localhost'); + verifyUrl('localhost:9090', 'bolt://localhost:9090'); + verifyUrl('127.0.0.1', 'bolt://127.0.0.1'); + verifyUrl('127.0.0.1:7687', 'bolt://127.0.0.1:7687'); + verifyUrl('10.198.20.1', 'bolt+routing://10.198.20.1'); + verifyUrl('15.8.8.9:20004', 'wss://15.8.8.9:20004'); + }); + + it('should fail to parse url from non-string argument', () => { + expect(() => util.parseUrl({})).toThrowError(TypeError); + expect(() => util.parseUrl(['bolt://localhost:2020'])).toThrowError(TypeError); + expect(() => util.parseUrl(() => 'bolt://localhost:8888')).toThrowError(TypeError); + }); + + it('should parse host', () => { + verifyHost('localhost', 'bolt://localhost'); + verifyHost('neo4j.com', 'bolt+routing://neo4j.com'); + verifyHost('neo4j.com', 'bolt+routing://neo4j.com:8080'); + verifyHost('127.0.0.1', 'https://127.0.0.1'); + verifyHost('127.0.0.1', 'ws://127.0.0.1:2020'); + }); + + it('should fail to parse host from non-string argument', () => { + expect(() => util.parseHost({})).toThrowError(TypeError); + expect(() => util.parseHost(['bolt://localhost:2020'])).toThrowError(TypeError); + expect(() => util.parseHost(() => 'bolt://localhost:8888')).toThrowError(TypeError); + }); + + it('should parse port', () => { + verifyPort('7474', 'http://localhost:7474'); + verifyPort('8080', 'http://127.0.0.1:8080'); + verifyPort('20005', 'bolt+routing://neo4j.com:20005'); + verifyPort('4242', 'bolt+routing://1.1.1.1:4242'); + verifyPort('42', 'http://10.192.168.5:42'); + + verifyPort(undefined, 'https://localhost'); + verifyPort(undefined, 'ws://8.8.8.8'); + }); + + it('should fail to parse port from non-string argument', () => { + expect(() => util.parsePort({port: 1515})).toThrowError(TypeError); + expect(() => util.parsePort(['bolt://localhost:2020'])).toThrowError(TypeError); + expect(() => util.parsePort(() => 'bolt://localhost:8888')).toThrowError(TypeError); + }); + + it('should parse routing context', () => { + verifyRoutingContext({ + name: 'molly', + age: '1', + color: 'white' + }, 'bolt+routing://localhost:7687/cat?name=molly&age=1&color=white'); + + verifyRoutingContext({ + key1: 'value1', + key2: 'value2' + }, 'bolt+routing://localhost:7687/?key1=value1&key2=value2'); + + verifyRoutingContext({key: 'value'}, 'bolt+routing://10.198.12.2:9999?key=value'); + + verifyRoutingContext({}, 'bolt+routing://localhost:7687?'); + verifyRoutingContext({}, 'bolt+routing://localhost:7687/?'); + verifyRoutingContext({}, 'bolt+routing://localhost:7687/cat?'); + verifyRoutingContext({}, 'bolt+routing://localhost:7687/lala'); + }); + + it('should fail to parse routing context from non-string argument', () => { + expect(() => util.parseRoutingContext({key1: 'value1'})).toThrowError(TypeError); + expect(() => util.parseRoutingContext(['bolt://localhost:2020/?key=value'])).toThrowError(TypeError); + expect(() => util.parseRoutingContext(() => 'bolt://localhost?key1=value&key2=value2')).toThrowError(TypeError); + }); + + it('should fail to parse routing context from illegal parameters', () => { + expect(() => util.parseRoutingContext('bolt+routing://localhost:7687/?justKey')).toThrow(); + expect(() => util.parseRoutingContext('bolt+routing://localhost:7687/?=value1&key2=value2')).toThrow(); + expect(() => util.parseRoutingContext('bolt+routing://localhost:7687/key1?=value1&key2=')).toThrow(); + expect(() => util.parseRoutingContext('bolt+routing://localhost:7687/?key1=value1&key2=value2&key1=value2')).toThrow(); + }); + function verifyValidString(str) { expect(util.assertString(str, 'Test string')).toBe(str); } @@ -63,4 +169,24 @@ describe('util', () => { expect(() => util.assertString(str, 'Test string')).toThrowError(TypeError); } + function verifyScheme(expectedScheme, url) { + expect(util.parseScheme(url)).toEqual(expectedScheme); + } + + function verifyUrl(expectedUrl, url) { + expect(util.parseUrl(url)).toEqual(expectedUrl); + } + + function verifyHost(expectedHost, url) { + expect(util.parseHost(url)).toEqual(expectedHost); + } + + function verifyPort(expectedPort, url) { + expect(util.parsePort(url)).toEqual(expectedPort); + } + + function verifyRoutingContext(expectedRoutingContext, url) { + expect(util.parseRoutingContext(url)).toEqual(expectedRoutingContext); + } + }); From 93ab3baf69a3786badbea7daf328c4c980b17431 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 24 Apr 2017 18:27:32 +0200 Subject: [PATCH 4/5] Routing context in Bolt URI Commit makes it possible to specify routing parameters in Bolt URI query. These parameters are then send to server when doing rediscovery. This is true only for 3.2+ Neo4j database because it supports `getRoutingTable(context)` procedure. Renamed `GetServersUtil` to `RoutingUtil` because it now knows about both routing procedures `getServers` and `getRoutingTable`. --- src/v1/driver.js | 24 +++- src/v1/index.js | 12 +- src/v1/internal/connection-providers.js | 10 +- src/v1/internal/connector.js | 68 ++++++++++- src/v1/internal/rediscovery.js | 25 ++-- .../{get-servers-util.js => routing-util.js} | 67 ++++++----- src/v1/session.js | 20 ++-- test/internal/connection-providers.test.js | 4 + test/internal/connector.test.js | 61 +++++++++- test/internal/fake-connection.js | 16 ++- test/internal/rediscovery.test.js | 52 ++++----- ...vers-util.test.js => routing-util.test.js} | 108 ++++++++++++++---- test/internal/util.test.js | 2 +- .../boltkit/get_routing_table.script | 17 +++ .../get_routing_table_with_context.script | 16 +++ .../rediscover_and_read_with_init.script | 16 +++ test/v1/driver.test.js | 4 + test/v1/routing.driver.boltkit.it.js | 93 ++++++++++++++- 18 files changed, 500 insertions(+), 115 deletions(-) rename src/v1/internal/{get-servers-util.js => routing-util.js} (58%) rename test/internal/{get-servers-util.test.js => routing-util.test.js} (67%) create mode 100644 test/resources/boltkit/get_routing_table.script create mode 100644 test/resources/boltkit/get_routing_table_with_context.script create mode 100644 test/resources/boltkit/rediscover_and_read_with_init.script diff --git a/src/v1/driver.js b/src/v1/driver.js index a3cce792c..6c46ccd0d 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -58,7 +58,13 @@ class Driver { Driver._validateConnection.bind(this), config.connectionPoolSize ); - this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this)); + + /** + * Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}. + * @type {ConnectionProvider} + * @private + */ + this._connectionProvider = null; } /** @@ -115,7 +121,8 @@ class Driver { */ session(mode, bookmark) { const sessionMode = Driver._validateSessionMode(mode); - return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config); + const connectionProvider = this._getOrCreateConnectionProvider(); + return this._createSession(sessionMode, connectionProvider, bookmark, this._config); } static _validateSessionMode(rawMode) { @@ -142,6 +149,14 @@ class Driver { return SERVICE_UNAVAILABLE; } + _getOrCreateConnectionProvider() { + if (!this._connectionProvider) { + const driverOnErrorCallback = this._driverOnErrorCallback.bind(this); + this._connectionProvider = this._createConnectionProvider(this._url, this._pool, driverOnErrorCallback); + } + return this._connectionProvider; + } + _driverOnErrorCallback(error) { const userDefinedOnErrorCallback = this.onError; if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) { @@ -189,8 +204,9 @@ class _ConnectionStreamObserver extends StreamObserver { if (this._driver.onCompleted) { this._driver.onCompleted(message); } - if (this._conn && message && message.server) { - this._conn.setServerVersion(message.server); + + if (this._observer && this._observer.onComplete) { + this._observer.onCompleted(message); } } } diff --git a/src/v1/index.js b/src/v1/index.js index c694a6c41..6103b7b23 100644 --- a/src/v1/index.js +++ b/src/v1/index.js @@ -128,20 +128,18 @@ function driver(url, authToken, config = {}) { assertString(url, 'Bolt URL'); const scheme = parseScheme(url); const routingContext = parseRoutingContext(url); - if (scheme === "bolt+routing://") { + if (scheme === 'bolt+routing://') { return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config); - } else if (scheme === "bolt://") { - if(!isEmptyObjectOrNull(routingContext)) - { - throw new Error("Routing context are not supported with scheme 'bolt'. Given URI: '" + url + "'"); + } else if (scheme === 'bolt://') { + if (!isEmptyObjectOrNull(routingContext)) { + throw new Error(`Routing parameters are not supported with scheme 'bolt'. Given URL: '${url}'`); } return new Driver(parseUrl(url), USER_AGENT, authToken, config); } else { - throw new Error("Unknown scheme: " + scheme); + throw new Error(`Unknown scheme: ${scheme}`); } } - const types ={ Node, Relationship, diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index 5b6599772..c3aff54bc 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -25,7 +25,7 @@ import RoutingTable from './routing-table'; import Rediscovery from './rediscovery'; import hasFeature from './features'; import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers'; -import GetServersUtil from './get-servers-util'; +import RoutingUtil from './routing-util'; class ConnectionProvider { @@ -66,7 +66,7 @@ export class LoadBalancer extends ConnectionProvider { super(); this._seedRouter = address; this._routingTable = new RoutingTable(new RoundRobinArray([this._seedRouter])); - this._rediscovery = new Rediscovery(new GetServersUtil(routingContext)); + this._rediscovery = new Rediscovery(new RoutingUtil(routingContext)); this._connectionPool = connectionPool; this._driverOnErrorCallback = driverOnErrorCallback; this._hostNameResolver = LoadBalancer._createHostNameResolver(); @@ -172,8 +172,10 @@ export class LoadBalancer extends ConnectionProvider { _createSessionForRediscovery(routerAddress) { const connection = this._connectionPool.acquire(routerAddress); - const connectionPromise = Promise.resolve(connection); - const connectionProvider = new SingleConnectionProvider(connectionPromise); + // initialized connection is required for routing procedure call + // server version needs to be known to decide which routing procedure to use + const initializedConnectionPromise = connection.initializationCompleted(); + const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise); return new Session(READ, connectionProvider); } diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 24b20b048..b8c5de31b 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -26,6 +26,7 @@ import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../gra import {newError} from './../error'; import ChannelConfig from './ch-config'; import {parseHost, parsePort} from './util'; +import StreamObserver from './stream-observer'; let Channel; if( NodeChannel.available ) { @@ -183,6 +184,8 @@ class Connection { this._isHandlingFailure = false; this._currentFailure = null; + this._state = new ConnectionState(this); + // Set to true on fatal errors, to get this out of session pool. this._isBroken = false; @@ -330,7 +333,8 @@ class Connection { /** Queue an INIT-message to be sent to the database */ initialize( clientName, token, observer ) { log("C", "INIT", clientName, token); - this._queueObserver(observer); + const initObserver = this._state.wrap(observer); + this._queueObserver(initObserver); this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)], (err) => this._handleFatalError(err) ); this._chunker.messageBoundary(); @@ -416,6 +420,15 @@ class Connection { } } + /** + * Get promise resolved when connection initialization succeed or rejected when it fails. + * Connection is initialized using {@link initialize} function. + * @return {Promise} the result of connection initialization. + */ + initializationCompleted() { + return this._state.initializationCompleted(); + } + /** * Synchronize - flush all queued outgoing messages and route their responses * to their respective handlers. @@ -450,6 +463,59 @@ class Connection { } } +class ConnectionState { + + /** + * @constructor + * @param {Connection} connection the connection to track state for. + */ + constructor(connection) { + this._connection = connection; + this._resolvePromise = null; + this._promise = new Promise(resolve => { + this._resolvePromise = resolve; + }); + } + + /** + * Wrap the given observer to track connection's initialization state. + * @param {StreamObserver} observer the observer used for INIT message. + * @return {StreamObserver} updated observer. + */ + wrap(observer) { + return { + onNext: record => { + if (observer && observer.onNext) { + observer.onNext(record); + } + }, + onError: error => { + this._resolvePromise(Promise.reject(error)); + if (observer && observer.onError) { + observer.onError(error); + } + }, + onCompleted: metaData => { + if (metaData && metaData.server) { + this._connection.setServerVersion(metaData.server); + } + this._resolvePromise(this._connection); + if (observer && observer.onCompleted) { + observer.onCompleted(metaData); + } + } + }; + } + + /** + * Get promise resolved when connection initialization succeed or rejected when it fails. + * @return {Promise} the result of connection initialization. + */ + initializationCompleted() { + return this._promise; + } +} + /** * Crete new connection to the provided url. * @access private diff --git a/src/v1/internal/rediscovery.js b/src/v1/internal/rediscovery.js index a07977fde..47d1fe4c5 100644 --- a/src/v1/internal/rediscovery.js +++ b/src/v1/internal/rediscovery.js @@ -17,18 +17,27 @@ * limitations under the License. */ -import GetServersUtil from "./get-servers-util"; -import RoutingTable from "./routing-table"; -import {newError, PROTOCOL_ERROR} from "../error"; +import RoutingTable from './routing-table'; +import {newError, PROTOCOL_ERROR} from '../error'; export default class Rediscovery { - constructor(getServersUtil) { - this._getServersUtil = getServersUtil; + /** + * @constructor + * @param {RoutingUtil} routingUtil the util to use. + */ + constructor(routingUtil) { + this._routingUtil = routingUtil; } + /** + * Try to fetch new routing table from the given router. + * @param {Session} session the session to use. + * @param {string} routerAddress the URL of the router. + * @return {Promise} promise resolved with new routing table or null when connection error happened. + */ lookupRoutingTableOnRouter(session, routerAddress) { - return this._getServersUtil.callGetServers(session, routerAddress).then(records => { + return this._routingUtil.callRoutingProcedure(session, routerAddress).then(records => { if (records === null) { // connection error happened, unable to retrieve routing table from this router, next one should be queried return null; @@ -42,8 +51,8 @@ export default class Rediscovery { const record = records[0]; - const expirationTime = this._getServersUtil.parseTtl(record, routerAddress); - const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress); + const expirationTime = this._routingUtil.parseTtl(record, routerAddress); + const {routers, readers, writers} = this._routingUtil.parseServers(record, routerAddress); Rediscovery._assertNonEmpty(routers, 'routers', routerAddress); Rediscovery._assertNonEmpty(readers, 'readers', routerAddress); diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/routing-util.js similarity index 58% rename from src/v1/internal/get-servers-util.js rename to src/v1/internal/routing-util.js index 7d10faba9..4ab0b747a 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/routing-util.js @@ -20,44 +20,39 @@ import RoundRobinArray from './round-robin-array'; import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error'; import Integer, {int} from '../integer'; -import {ServerVersion, VERSION3_2} from './server-version-util' +import {ServerVersion, VERSION_3_2_0} from './server-version'; const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers'; -const GET_ROUTING_TABLE_PARAM = "context"; -const CALL_GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable({" - + GET_ROUTING_TABLE_PARAM + "})"; +const GET_ROUTING_TABLE_PARAM = 'context'; +const CALL_GET_ROUTING_TABLE = 'CALL dbms.cluster.routing.getRoutingTable({' + GET_ROUTING_TABLE_PARAM + '})'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; -export default class GetServersUtil { +export default class RoutingUtil { - constructor(routingContext={}) { + constructor(routingContext) { this._routingContext = routingContext; } - callGetServers(session, routerAddress) { - session.run("RETURN 1").then(result=>{ - let statement = {text:CALL_GET_SERVERS}; - - if(ServerVersion.fromString(result.summary.server.version).compare(VERSION3_2)>=0) - { - statement = { - text:CALL_GET_ROUTING_TABLE, - parameters:{GET_ROUTING_TABLE_PARAM: this._routingContext}}; + /** + * Invoke routing procedure using the given session. + * @param {Session} session the session to use. + * @param {string} routerAddress the URL of the router. + * @return {Promise} promise resolved with records returned by the procedure call or null if + * connection error happened. + */ + callRoutingProcedure(session, routerAddress) { + return this._callAvailableRoutingProcedure(session).then(result => { + session.close(); + return result.records; + }).catch(error => { + if (error.code === PROCEDURE_NOT_FOUND_CODE) { + // throw when getServers procedure not found because this is clearly a configuration issue + throw newError('Server ' + routerAddress + ' could not perform routing. ' + + 'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE); } - - return session.run(statement).then(result => { - session.close(); - return result.records; - }).catch(error => { - if (error.code === PROCEDURE_NOT_FOUND_CODE) { - // throw when getServers procedure not found because this is clearly a configuration issue - throw newError('Server ' + routerAddress + ' could not perform routing. ' + - 'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE); - } - // return nothing when failed to connect because code higher in the callstack is still able to retry with a - // different session towards a different router - return null; - }); + // return nothing when failed to connect because code higher in the callstack is still able to retry with a + // different session towards a different router + return null; }); } @@ -111,4 +106,18 @@ export default class GetServersUtil { PROTOCOL_ERROR); } } + + _callAvailableRoutingProcedure(session) { + return session._run(null, null, (connection, streamObserver) => { + const serverVersionString = connection.server.version; + const serverVersion = ServerVersion.fromString(serverVersionString); + + if (serverVersion.compareTo(VERSION_3_2_0) >= 0) { + const params = {[GET_ROUTING_TABLE_PARAM]: this._routingContext}; + connection.run(CALL_GET_ROUTING_TABLE, params, streamObserver); + } else { + connection.run(CALL_GET_SERVERS, {}, streamObserver); + } + }); + } } diff --git a/src/v1/session.js b/src/v1/session.js index 1dccd907f..b87fdc1cc 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -59,28 +59,34 @@ class Session { * @return {Result} - New Result */ run(statement, parameters = {}) { - if(typeof statement === 'object' && statement.text) { + if (typeof statement === 'object' && statement.text) { parameters = statement.parameters || {}; statement = statement.text; } - assertString(statement, "Cypher statement"); + assertString(statement, 'Cypher statement'); + return this._run(statement, parameters, (connection, streamObserver) => + connection.run(statement, parameters, streamObserver) + ); + } + + _run(statement, parameters, statementRunner) { const streamObserver = new _RunObserver(this._onRunFailure()); const connectionHolder = this._connectionHolderWithMode(this._mode); if (!this._hasTx) { connectionHolder.initializeConnection(); connectionHolder.getConnection().then(connection => { streamObserver.resolveConnection(connection); - connection.run(statement, parameters, streamObserver); + statementRunner(connection, streamObserver); connection.pullAll(streamObserver); connection.sync(); }).catch(error => streamObserver.onError(error)); } else { - streamObserver.onError(newError("Statements cannot be run directly on a " - + "session with an open transaction; either run from within the " - + "transaction or use a different session.")); + streamObserver.onError(newError('Statements cannot be run directly on a ' + + 'session with an open transaction; either run from within the ' + + 'transaction or use a different session.')); } - return new Result( streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder ); + return new Result(streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder); } /** diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 7b78b5e64..c7b35d33b 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -1102,6 +1102,10 @@ class FakeConnection { static create(address, release) { return new FakeConnection(address, release); } + + initializationCompleted() { + return Promise.resolve(this); + } } class FakeRediscovery { diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index 3610a0b56..6591d91f4 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -24,8 +24,9 @@ import {Chunker} from '../../src/v1/internal/chunking'; import {alloc} from '../../src/v1/internal/buf'; import {Neo4jError} from '../../src/v1/error'; import sharedNeo4j from '../internal/shared-neo4j'; +import {ServerVersion} from '../../src/v1/internal/server-version'; -describe('connector', () => { +fdescribe('connector', () => { it('should read/write basic messages', done => { // Given @@ -117,6 +118,64 @@ describe('connector', () => { channel.onmessage(packedFailureMessage(errorCode, errorMessage)); }); + it('should notify when connection initialization completes', done => { + const connection = connect('bolt://localhost'); + + connection.initializationCompleted().then(initializedConnection => { + expect(initializedConnection).toBe(connection); + done(); + }); + + connection.initialize('mydriver/0.0.0', basicAuthToken()); + }); + + it('should notify when connection initialization fails', done => { + const connection = connect('bolt://localhost:7474'); // wrong port + + connection.initializationCompleted().then(() => { + console.log('THEN called: ', arguments) + }).catch(error => { + expect(error).toBeDefined(); + done(); + }); + + connection.initialize('mydriver/0.0.0', basicAuthToken()); + }); + + it('should notify provided observer when connection initialization completes', done => { + const connection = connect('bolt://localhost'); + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onCompleted: metaData => { + expect(metaData).toBeDefined(); + done(); + }, + }); + }); + + it('should notify provided observer when connection initialization fails', done => { + const connection = connect('bolt://localhost:7474'); // wrong port + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onError: error => { + expect(error).toBeDefined(); + done(); + }, + }); + }); + + it('should have server version after connection initialization completed', done => { + const connection = connect('bolt://localhost'); + + connection.initializationCompleted().then(initializedConnection => { + const serverVersion = ServerVersion.fromString(initializedConnection.server.version); + expect(serverVersion).toBeDefined(); + done(); + }); + + connection.initialize('mydriver/0.0.0', basicAuthToken()); + }); + function packedHandshakeMessage() { const result = alloc(4); result.putInt32(0, 1); diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 60dd3c11e..c2b773d1d 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -31,9 +31,14 @@ export default class FakeConnection { this.resetAsyncInvoked = 0; this.syncInvoked = 0; this.releaseInvoked = 0; + this.seenStatements = []; + this.seenParameters = []; + this.server = {}; } - run() { + run(statement, parameters) { + this.seenStatements.push(statement); + this.seenParameters.push(parameters); } discardAll() { @@ -55,6 +60,10 @@ export default class FakeConnection { this.releaseInvoked++; } + initializationCompleted() { + return Promise.resolve(this); + } + isReleasedOnceOnSessionClose() { return this.isReleasedOnSessionCloseTimes(1); } @@ -80,4 +89,9 @@ export default class FakeConnection { this.syncInvoked === times && this.releaseInvoked === times; } + + withServerVersion(version) { + this.server.version = version; + return this; + } }; diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js index b23816207..cda4cec6d 100644 --- a/test/internal/rediscovery.test.js +++ b/test/internal/rediscovery.test.js @@ -18,7 +18,7 @@ */ import Rediscovery from "../../src/v1/internal/rediscovery"; -import GetServersUtil from "../../src/v1/internal/get-servers-util"; +import RoutingUtil from "../../src/v1/internal/routing-util"; import {newError, PROTOCOL_ERROR} from "../../src/v1/error"; import Record from "../../src/v1/record"; import {int} from "../../src/v1/integer"; @@ -30,8 +30,8 @@ const ROUTER_ADDRESS = 'bolt+routing://test.router.com'; describe('rediscovery', () => { it('should return null when connection error happens', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => null, + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => null, }); lookupRoutingTableOnRouter(util).then(routingTable => { @@ -41,8 +41,8 @@ describe('rediscovery', () => { }); it('should throw when no records are returned', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [], }); lookupRoutingTableOnRouter(util).catch(error => { @@ -52,8 +52,8 @@ describe('rediscovery', () => { }); it('should throw when multiple records are returned', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa']), new Record(['b'], ['bbb'])] + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa']), new Record(['b'], ['bbb'])] }); lookupRoutingTableOnRouter(util).catch(error => { @@ -63,8 +63,8 @@ describe('rediscovery', () => { }); it('should throw when ttl parsing throws', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => { throw newError('Unable to parse TTL', PROTOCOL_ERROR); } @@ -77,8 +77,8 @@ describe('rediscovery', () => { }); it('should throw when servers parsing throws', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), parseServers: () => { throw newError('Unable to parse servers', PROTOCOL_ERROR); @@ -92,8 +92,8 @@ describe('rediscovery', () => { }); it('should throw when no routers', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), parseServers: () => { return { @@ -111,8 +111,8 @@ describe('rediscovery', () => { }); it('should throw when no readers', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), parseServers: () => { return { @@ -130,8 +130,8 @@ describe('rediscovery', () => { }); it('should return routing table when no writers', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), parseServers: () => { return { @@ -162,8 +162,8 @@ describe('rediscovery', () => { }); function testValidRoutingTable(routerAddresses, readerAddresses, writerAddresses, expires, done) { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => expires, parseServers: () => { return { @@ -188,8 +188,8 @@ describe('rediscovery', () => { }); } - function lookupRoutingTableOnRouter(getServersUtil) { - const rediscovery = new Rediscovery(getServersUtil); + function lookupRoutingTableOnRouter(routingUtil) { + const rediscovery = new Rediscovery(routingUtil); return rediscovery.lookupRoutingTableOnRouter(null, ROUTER_ADDRESS); } @@ -202,19 +202,19 @@ describe('rediscovery', () => { throw new Error('Should not be called'); } - class FakeGetServersUtil extends GetServersUtil { + class FakeRoutingUtil extends RoutingUtil { - constructor({callGetServers = shouldNotBeCalled, parseTtl = shouldNotBeCalled, parseServers = shouldNotBeCalled}) { + constructor({callRoutingProcedure = shouldNotBeCalled, parseTtl = shouldNotBeCalled, parseServers = shouldNotBeCalled}) { super(); - this._callGetServers = callGetServers; + this._callAvailableRoutingProcedure = callRoutingProcedure; this._parseTtl = parseTtl; this._parseServers = parseServers; } - callGetServers(session, routerAddress) { + callRoutingProcedure(session, routerAddress) { return new Promise((resolve, reject) => { try { - resolve(this._callGetServers()); + resolve(this._callAvailableRoutingProcedure()); } catch (error) { reject(error); } diff --git a/test/internal/get-servers-util.test.js b/test/internal/routing-util.test.js similarity index 67% rename from test/internal/get-servers-util.test.js rename to test/internal/routing-util.test.js index e2323faf4..6229e10b5 100644 --- a/test/internal/get-servers-util.test.js +++ b/test/internal/routing-util.test.js @@ -17,11 +17,12 @@ * limitations under the License. */ -import GetServersUtil from "../../src/v1/internal/get-servers-util"; -import Record from "../../src/v1/record"; -import Integer, {int} from "../../src/v1/integer"; -import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from "../../src/v1/error"; -import lolex from "lolex"; +import RoutingUtil from '../../src/v1/internal/routing-util'; +import Record from '../../src/v1/record'; +import Integer, {int} from '../../src/v1/integer'; +import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error'; +import lolex from 'lolex'; +import FakeConnection from './fake-connection'; const ROUTER_ADDRESS = 'bolt+routing://test.router.com'; @@ -40,7 +41,7 @@ describe('get-servers-util', () => { it('should return retrieved records when query succeeds', done => { const session = FakeSession.successful({records: ['foo', 'bar', 'baz']}); - callGetServers(session).then(records => { + callRoutingProcedure(session).then(records => { expect(records).toEqual(['foo', 'bar', 'baz']); done(); }).catch(console.log); @@ -49,7 +50,7 @@ describe('get-servers-util', () => { it('should close session when query succeeds', done => { const session = FakeSession.successful({records: ['foo', 'bar', 'baz']}); - callGetServers(session).then(() => { + callRoutingProcedure(session).then(() => { expect(session.isClosed()).toBeTruthy(); done(); }).catch(console.log); @@ -58,7 +59,7 @@ describe('get-servers-util', () => { it('should not close session when query fails', done => { const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)); - callGetServers(session).then(() => { + callRoutingProcedure(session).then(() => { expect(session.isClosed()).toBeFalsy(); done(); }).catch(console.log); @@ -67,7 +68,7 @@ describe('get-servers-util', () => { it('should return null on connection error', done => { const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)); - callGetServers(session).then(records => { + callRoutingProcedure(session).then(records => { expect(records).toBeNull(); done(); }).catch(console.log); @@ -76,7 +77,7 @@ describe('get-servers-util', () => { it('should fail when procedure not found', done => { const session = FakeSession.failed(newError('Oh no!', 'Neo.ClientError.Procedure.ProcedureNotFound')); - callGetServers(session).catch(error => { + callRoutingProcedure(session).catch(error => { expect(error.code).toBe(SERVICE_UNAVAILABLE); expect(error.message).toBe('Server ' + ROUTER_ADDRESS + ' could not perform routing. ' + 'Make sure you are connecting to a causal cluster'); @@ -84,6 +85,61 @@ describe('get-servers-util', () => { }); }); + it('should use getServers procedure when server version is older than 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.1.9'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getServers']); + expect(connection.seenParameters).toEqual([{}]); + done(); + }); + }); + + it('should use getRoutingTable procedure with empty routing context when server version is 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.2.0'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getRoutingTable({context})']); + expect(connection.seenParameters).toEqual([{context: {}}]); + done(); + }); + }); + + it('should use getRoutingTable procedure with routing context when server version is 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.2.0'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {key1: 'value1', key2: 'value2'}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getRoutingTable({context})']); + expect(connection.seenParameters).toEqual([{context: {key1: 'value1', key2: 'value2'}}]); + done(); + }); + }); + + it('should use getRoutingTable procedure with empty routing context when server version is newer than 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.3.5'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getRoutingTable({context})']); + expect(connection.seenParameters).toEqual([{context: {}}]); + done(); + }); + }); + + it('should use getRoutingTable procedure with routing context when server version is newer than 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.2.8'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {key1: 'foo', key2: 'bar'}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getRoutingTable({context})']); + expect(connection.seenParameters).toEqual([{context: {key1: 'foo', key2: 'bar'}}]); + done(); + }); + }); + it('should parse valid ttl', () => { testValidTtlParsing(100, 5); testValidTtlParsing(Date.now(), 3600); // 1 hour @@ -197,18 +253,18 @@ describe('get-servers-util', () => { expect(writers.toArray()).toEqual(writerAddresses); } - function callGetServers(session) { - const util = new GetServersUtil(); - return util.callGetServers(session, ROUTER_ADDRESS); + function callRoutingProcedure(session, routingContext) { + const util = new RoutingUtil(routingContext || {}); + return util.callRoutingProcedure(session, ROUTER_ADDRESS); } function parseTtl(record) { - const util = new GetServersUtil(); + const util = new RoutingUtil(); return util.parseTtl(record, ROUTER_ADDRESS); } function parseServers(record) { - const util = new GetServersUtil(); + const util = new RoutingUtil(); return util.parseServers(record, ROUTER_ADDRESS); } @@ -245,22 +301,30 @@ describe('get-servers-util', () => { class FakeSession { - constructor(runResponses) { - this._runResponses = runResponses; + constructor(runResponse, fakeConnection) { + this._runResponse = runResponse; + this._fakeConnection = fakeConnection; this._closed = false; - this._runCounter = 0; } static successful(result) { - return new FakeSession(Promise.resolve(result)); + return new FakeSession(Promise.resolve(result), null); } static failed(error) { - return new FakeSession(Promise.reject(error)); + return new FakeSession(Promise.reject(error), null); } - run() { - return this._runResponses[this._runCounter ++]; + static withFakeConnection(connection) { + return new FakeSession(null, connection); + } + + _run(ignoreStatement, ignoreParameters, statementRunner) { + if (this._runResponse) { + return this._runResponse; + } + statementRunner(this._fakeConnection); + return Promise.resolve(); } close() { diff --git a/test/internal/util.test.js b/test/internal/util.test.js index 2a1680cfd..907ad3421 100644 --- a/test/internal/util.test.js +++ b/test/internal/util.test.js @@ -19,7 +19,7 @@ import * as util from '../../src/v1/internal/util'; -fdescribe('util', () => { +describe('util', () => { it('should check empty objects', () => { expect(util.isEmptyObjectOrNull(null)).toBeTruthy(); diff --git a/test/resources/boltkit/get_routing_table.script b/test/resources/boltkit/get_routing_table.script new file mode 100644 index 000000000..2acb2013a --- /dev/null +++ b/test/resources/boltkit/get_routing_table.script @@ -0,0 +1,17 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +S: SUCCESS {"server": "Neo4j/3.2.2"} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Alice"] + RECORD ["Bob"] + RECORD ["Eve"] + SUCCESS {} diff --git a/test/resources/boltkit/get_routing_table_with_context.script b/test/resources/boltkit/get_routing_table_with_context.script new file mode 100644 index 000000000..56c268fa5 --- /dev/null +++ b/test/resources/boltkit/get_routing_table_with_context.script @@ -0,0 +1,16 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +S: SUCCESS {"server": "Neo4j/3.2.3"} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {"policy": "my_policy", "region": "china"}} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Alice"] + RECORD ["Bob"] + SUCCESS {} diff --git a/test/resources/boltkit/rediscover_and_read_with_init.script b/test/resources/boltkit/rediscover_and_read_with_init.script new file mode 100644 index 000000000..c2ce54c6f --- /dev/null +++ b/test/resources/boltkit/rediscover_and_read_with_init.script @@ -0,0 +1,16 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +S: SUCCESS {"server": "Neo4j/3.1.0"} +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name" {} + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Tina"] + SUCCESS {} diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index 6795af7cf..3ef9dc8f3 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -181,6 +181,10 @@ describe('driver', () => { expect(createRoutingDriverWithTOFU).toThrow(); }); + it('should fail when bolt:// scheme used with routing params', () => { + expect(() => neo4j.driver('bolt://localhost:7687/?policy=my_policy')).toThrow(); + }); + const exposedTypes = [ 'Node', 'Path', diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index a16892db8..bb627fd8a 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -1549,6 +1549,87 @@ describe('routing driver', () => { }); }); + it('should invoke procedure get routing table when server version permits', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/get_routing_table.script', 9001); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + const session = driver.session(); + session.run('MATCH (n) RETURN n.name AS name').then(result => { + const names = result.records.map(record => record.get('name')); + expect(names).toEqual(['Alice', 'Bob', 'Eve']); + + session.close(() => { + driver.close(); + router.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + + it('should send routing context to server', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/get_routing_table_with_context.script', 9001); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001/?policy=my_policy®ion=china'); + const session = driver.session(); + session.run('MATCH (n) RETURN n.name AS name').then(result => { + const names = result.records.map(record => record.get('name')); + expect(names).toEqual(['Alice', 'Bob']); + + session.close(() => { + driver.close(); + router.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + + it('should ignore routing context when server does not support it', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/rediscover_and_read_with_init.script', 9001); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001/?policy=my_policy'); + const session = driver.session(); + session.run('MATCH (n) RETURN n.name').then(result => { + const names = result.records.map(record => record.get(0)); + expect(names).toEqual(['Bob', 'Tina']); + + session.close(() => { + driver.close(); + router.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + function moveNextDateNow30SecondsForward() { const currentTime = Date.now(); hijackNextDateNowCall(currentTime + 30 * 1000 + 1); @@ -1700,19 +1781,23 @@ describe('routing driver', () => { } function setupFakeHostNameResolution(driver, seedRouter, resolvedAddresses) { - driver._connectionProvider._hostNameResolver = new FakeHostNameResolver(seedRouter, resolvedAddresses); + const connectionProvider = driver._getOrCreateConnectionProvider(); + connectionProvider._hostNameResolver = new FakeHostNameResolver(seedRouter, resolvedAddresses); } function getConnectionPool(driver) { - return driver._connectionProvider._connectionPool; + const connectionProvider = driver._getOrCreateConnectionProvider(); + return connectionProvider._connectionPool; } function getRoutingTable(driver) { - return driver._connectionProvider._routingTable; + const connectionProvider = driver._getOrCreateConnectionProvider(); + return connectionProvider._routingTable; } function setRoutingTable(driver, newRoutingTable) { - driver._connectionProvider._routingTable = newRoutingTable; + const connectionProvider = driver._getOrCreateConnectionProvider(); + connectionProvider._routingTable = newRoutingTable; } function joinStrings(array) { From 4dc299dd87d448dde7ff04b53bee433c46c9681c Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 24 Apr 2017 18:48:13 +0200 Subject: [PATCH 5/5] Lazily create promise to avoid rejection warnings Connection exposes a promise resolved/rejected after INIT message. This is done via `#initializationCompleted()` function. This commit makes promise creation lazy because otherwise it results in `UnhandledPromiseRejectionWarning` if noone calls `#initializationCompleted()`. --- src/v1/internal/connector.js | 31 +++++++++++++++++++++++++------ test/internal/connector.test.js | 2 +- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index b8c5de31b..bc6afbb21 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -471,10 +471,12 @@ class ConnectionState { */ constructor(connection) { this._connection = connection; + + this._initialized = false; + this._initializationError = null; + this._resolvePromise = null; - this._promise = new Promise(resolve => { - this._resolvePromise = resolve; - }); + this._rejectPromise = null; } /** @@ -490,7 +492,11 @@ class ConnectionState { } }, onError: error => { - this._resolvePromise(Promise.reject(error)); + this._initializationError = error; + if (this._rejectPromise) { + this._rejectPromise(error); + this._rejectPromise = null; + } if (observer && observer.onError) { observer.onError(error); } @@ -499,7 +505,11 @@ class ConnectionState { if (metaData && metaData.server) { this._connection.setServerVersion(metaData.server); } - this._resolvePromise(this._connection); + this._initialized = true; + if (this._resolvePromise) { + this._resolvePromise(this._connection); + this._resolvePromise = null; + } if (observer && observer.onCompleted) { observer.onCompleted(metaData); } @@ -512,7 +522,16 @@ class ConnectionState { * @return {Promise} the result of connection initialization. */ initializationCompleted() { - return this._promise; + if (this._initialized) { + return Promise.resolve(this._connection); + } else if (this._initializationError) { + return Promise.reject(this._initializationError); + } else { + return new Promise((resolve, reject) => { + this._resolvePromise = resolve; + this._rejectPromise = reject; + }); + } } } diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index 6591d91f4..ba29ec2c4 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -26,7 +26,7 @@ import {Neo4jError} from '../../src/v1/error'; import sharedNeo4j from '../internal/shared-neo4j'; import {ServerVersion} from '../../src/v1/internal/server-version'; -fdescribe('connector', () => { +describe('connector', () => { it('should read/write basic messages', done => { // Given