diff --git a/src/v1/driver.js b/src/v1/driver.js index a3cce792c..6c46ccd0d 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -58,7 +58,13 @@ class Driver { Driver._validateConnection.bind(this), config.connectionPoolSize ); - this._connectionProvider = this._createConnectionProvider(url, this._pool, this._driverOnErrorCallback.bind(this)); + + /** + * Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}. + * @type {ConnectionProvider} + * @private + */ + this._connectionProvider = null; } /** @@ -115,7 +121,8 @@ class Driver { */ session(mode, bookmark) { const sessionMode = Driver._validateSessionMode(mode); - return this._createSession(sessionMode, this._connectionProvider, bookmark, this._config); + const connectionProvider = this._getOrCreateConnectionProvider(); + return this._createSession(sessionMode, connectionProvider, bookmark, this._config); } static _validateSessionMode(rawMode) { @@ -142,6 +149,14 @@ class Driver { return SERVICE_UNAVAILABLE; } + _getOrCreateConnectionProvider() { + if (!this._connectionProvider) { + const driverOnErrorCallback = this._driverOnErrorCallback.bind(this); + this._connectionProvider = this._createConnectionProvider(this._url, this._pool, driverOnErrorCallback); + } + return this._connectionProvider; + } + _driverOnErrorCallback(error) { const userDefinedOnErrorCallback = this.onError; if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) { @@ -189,8 +204,9 @@ class _ConnectionStreamObserver extends StreamObserver { if (this._driver.onCompleted) { this._driver.onCompleted(message); } - if (this._conn && message && message.server) { - this._conn.setServerVersion(message.server); + + if (this._observer && this._observer.onComplete) { + this._observer.onCompleted(message); } } } diff --git a/src/v1/index.js b/src/v1/index.js index 8a55f10d1..6103b7b23 100644 --- a/src/v1/index.js +++ b/src/v1/index.js @@ -26,9 +26,7 @@ import Record from './record'; import {Driver, READ, WRITE} from './driver'; import RoutingDriver from './routing-driver'; import VERSION from '../version'; -import {parseScheme, parseUrl} from './internal/connector'; -import {assertString} from './internal/util'; - +import {assertString, isEmptyObjectOrNull, parseRoutingContext, parseScheme, parseUrl} from './internal/util'; const auth = { basic: (username, password, realm = undefined) => { @@ -129,17 +127,19 @@ const USER_AGENT = "neo4j-javascript/" + VERSION; function driver(url, authToken, config = {}) { assertString(url, 'Bolt URL'); const scheme = parseScheme(url); - if (scheme === "bolt+routing://") { - return new RoutingDriver(parseUrl(url), USER_AGENT, authToken, config); - } else if (scheme === "bolt://") { + const routingContext = parseRoutingContext(url); + if (scheme === 'bolt+routing://') { + return new RoutingDriver(parseUrl(url), routingContext, USER_AGENT, authToken, config); + } else if (scheme === 'bolt://') { + if (!isEmptyObjectOrNull(routingContext)) { + throw new Error(`Routing parameters are not supported with scheme 'bolt'. Given URL: '${url}'`); + } return new Driver(parseUrl(url), USER_AGENT, authToken, config); } else { - throw new Error("Unknown scheme: " + scheme); - + throw new Error(`Unknown scheme: ${scheme}`); } } - const types ={ Node, Relationship, diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index a2e5e94e7..c3aff54bc 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -25,6 +25,7 @@ import RoutingTable from './routing-table'; import Rediscovery from './rediscovery'; import hasFeature from './features'; import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers'; +import RoutingUtil from './routing-util'; class ConnectionProvider { @@ -61,11 +62,11 @@ export class DirectConnectionProvider extends ConnectionProvider { export class LoadBalancer extends ConnectionProvider { - constructor(address, connectionPool, driverOnErrorCallback) { + constructor(address, routingContext, connectionPool, driverOnErrorCallback) { super(); this._seedRouter = address; this._routingTable = new RoutingTable(new RoundRobinArray([this._seedRouter])); - this._rediscovery = new Rediscovery(); + this._rediscovery = new Rediscovery(new RoutingUtil(routingContext)); this._connectionPool = connectionPool; this._driverOnErrorCallback = driverOnErrorCallback; this._hostNameResolver = LoadBalancer._createHostNameResolver(); @@ -171,8 +172,10 @@ export class LoadBalancer extends ConnectionProvider { _createSessionForRediscovery(routerAddress) { const connection = this._connectionPool.acquire(routerAddress); - const connectionPromise = Promise.resolve(connection); - const connectionProvider = new SingleConnectionProvider(connectionPromise); + // initialized connection is required for routing procedure call + // server version needs to be known to decide which routing procedure to use + const initializedConnectionPromise = connection.initializationCompleted(); + const connectionProvider = new SingleConnectionProvider(initializedConnectionPromise); return new Session(READ, connectionProvider); } diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 9d19de16e..bc6afbb21 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -16,6 +16,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + import WebSocketChannel from './ch-websocket'; import NodeChannel from './ch-node'; import {Chunker, Dechunker} from './chunking'; @@ -24,6 +25,8 @@ import {alloc} from './buf'; import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types'; import {newError} from './../error'; import ChannelConfig from './ch-config'; +import {parseHost, parsePort} from './util'; +import StreamObserver from './stream-observer'; let Channel; if( NodeChannel.available ) { @@ -59,29 +62,6 @@ PATH = 0x50, MAGIC_PREAMBLE = 0x6060B017, DEBUG = false; -let URLREGEX = new RegExp([ - "([^/]+//)?", // scheme - "(([^:/?#]*)", // hostname - "(?::([0-9]+))?)", // port (optional) - ".*"].join("")); // everything else - -function parseScheme( url ) { - let scheme = url.match(URLREGEX)[1] || ''; - return scheme.toLowerCase(); -} - -function parseUrl(url) { - return url.match( URLREGEX )[2]; -} - -function parseHost( url ) { - return url.match( URLREGEX )[3]; -} - -function parsePort( url ) { - return url.match( URLREGEX )[4]; -} - /** * Very rudimentary log handling, should probably be replaced by something proper at some point. * @param actor the part that sent the message, 'S' for server and 'C' for client @@ -204,6 +184,8 @@ class Connection { this._isHandlingFailure = false; this._currentFailure = null; + this._state = new ConnectionState(this); + // Set to true on fatal errors, to get this out of session pool. this._isBroken = false; @@ -351,7 +333,8 @@ class Connection { /** Queue an INIT-message to be sent to the database */ initialize( clientName, token, observer ) { log("C", "INIT", clientName, token); - this._queueObserver(observer); + const initObserver = this._state.wrap(observer); + this._queueObserver(initObserver); this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)], (err) => this._handleFatalError(err) ); this._chunker.messageBoundary(); @@ -437,6 +420,15 @@ class Connection { } } + /** + * Get promise resolved when connection initialization succeed or rejected when it fails. + * Connection is initialized using {@link initialize} function. + * @return {Promise} the result of connection initialization. + */ + initializationCompleted() { + return this._state.initializationCompleted(); + } + /** * Synchronize - flush all queued outgoing messages and route their responses * to their respective handlers. @@ -471,6 +463,78 @@ class Connection { } } +class ConnectionState { + + /** + * @constructor + * @param {Connection} connection the connection to track state for. + */ + constructor(connection) { + this._connection = connection; + + this._initialized = false; + this._initializationError = null; + + this._resolvePromise = null; + this._rejectPromise = null; + } + + /** + * Wrap the given observer to track connection's initialization state. + * @param {StreamObserver} observer the observer used for INIT message. + * @return {StreamObserver} updated observer. + */ + wrap(observer) { + return { + onNext: record => { + if (observer && observer.onNext) { + observer.onNext(record); + } + }, + onError: error => { + this._initializationError = error; + if (this._rejectPromise) { + this._rejectPromise(error); + this._rejectPromise = null; + } + if (observer && observer.onError) { + observer.onError(error); + } + }, + onCompleted: metaData => { + if (metaData && metaData.server) { + this._connection.setServerVersion(metaData.server); + } + this._initialized = true; + if (this._resolvePromise) { + this._resolvePromise(this._connection); + this._resolvePromise = null; + } + if (observer && observer.onCompleted) { + observer.onCompleted(metaData); + } + } + }; + } + + /** + * Get promise resolved when connection initialization succeed or rejected when it fails. + * @return {Promise} the result of connection initialization. + */ + initializationCompleted() { + if (this._initialized) { + return Promise.resolve(this._connection); + } else if (this._initializationError) { + return Promise.reject(this._initializationError); + } else { + return new Promise((resolve, reject) => { + this._resolvePromise = resolve; + this._rejectPromise = reject; + }); + } + } +} + /** * Crete new connection to the provided url. * @access private @@ -490,10 +554,6 @@ function connect(url, config = {}, connectionErrorCode = null) { } export { - connect, - parseScheme, - parseUrl, - parseHost, - parsePort, - Connection -} + connect, + Connection +}; diff --git a/src/v1/internal/host-name-resolvers.js b/src/v1/internal/host-name-resolvers.js index e3b328f65..50b8c9f8d 100644 --- a/src/v1/internal/host-name-resolvers.js +++ b/src/v1/internal/host-name-resolvers.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import {parseHost, parsePort} from './connector'; +import {parseHost, parsePort} from './util'; class HostNameResolver { diff --git a/src/v1/internal/rediscovery.js b/src/v1/internal/rediscovery.js index 08fb60bf0..47d1fe4c5 100644 --- a/src/v1/internal/rediscovery.js +++ b/src/v1/internal/rediscovery.js @@ -17,18 +17,27 @@ * limitations under the License. */ -import GetServersUtil from "./get-servers-util"; -import RoutingTable from "./routing-table"; -import {newError, PROTOCOL_ERROR} from "../error"; +import RoutingTable from './routing-table'; +import {newError, PROTOCOL_ERROR} from '../error'; export default class Rediscovery { - constructor(getServersUtil) { - this._getServersUtil = getServersUtil || new GetServersUtil(); + /** + * @constructor + * @param {RoutingUtil} routingUtil the util to use. + */ + constructor(routingUtil) { + this._routingUtil = routingUtil; } + /** + * Try to fetch new routing table from the given router. + * @param {Session} session the session to use. + * @param {string} routerAddress the URL of the router. + * @return {Promise} promise resolved with new routing table or null when connection error happened. + */ lookupRoutingTableOnRouter(session, routerAddress) { - return this._getServersUtil.callGetServers(session, routerAddress).then(records => { + return this._routingUtil.callRoutingProcedure(session, routerAddress).then(records => { if (records === null) { // connection error happened, unable to retrieve routing table from this router, next one should be queried return null; @@ -42,8 +51,8 @@ export default class Rediscovery { const record = records[0]; - const expirationTime = this._getServersUtil.parseTtl(record, routerAddress); - const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress); + const expirationTime = this._routingUtil.parseTtl(record, routerAddress); + const {routers, readers, writers} = this._routingUtil.parseServers(record, routerAddress); Rediscovery._assertNonEmpty(routers, 'routers', routerAddress); Rediscovery._assertNonEmpty(readers, 'readers', routerAddress); diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/routing-util.js similarity index 69% rename from src/v1/internal/get-servers-util.js rename to src/v1/internal/routing-util.js index d94da81df..4ab0b747a 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/routing-util.js @@ -20,14 +20,28 @@ import RoundRobinArray from './round-robin-array'; import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from '../error'; import Integer, {int} from '../integer'; +import {ServerVersion, VERSION_3_2_0} from './server-version'; -const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers'; +const CALL_GET_SERVERS = 'CALL dbms.cluster.routing.getServers'; +const GET_ROUTING_TABLE_PARAM = 'context'; +const CALL_GET_ROUTING_TABLE = 'CALL dbms.cluster.routing.getRoutingTable({' + GET_ROUTING_TABLE_PARAM + '})'; const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; -export default class GetServersUtil { +export default class RoutingUtil { - callGetServers(session, routerAddress) { - return session.run(PROCEDURE_CALL).then(result => { + constructor(routingContext) { + this._routingContext = routingContext; + } + + /** + * Invoke routing procedure using the given session. + * @param {Session} session the session to use. + * @param {string} routerAddress the URL of the router. + * @return {Promise} promise resolved with records returned by the procedure call or null if + * connection error happened. + */ + callRoutingProcedure(session, routerAddress) { + return this._callAvailableRoutingProcedure(session).then(result => { session.close(); return result.records; }).catch(error => { @@ -92,4 +106,18 @@ export default class GetServersUtil { PROTOCOL_ERROR); } } + + _callAvailableRoutingProcedure(session) { + return session._run(null, null, (connection, streamObserver) => { + const serverVersionString = connection.server.version; + const serverVersion = ServerVersion.fromString(serverVersionString); + + if (serverVersion.compareTo(VERSION_3_2_0) >= 0) { + const params = {[GET_ROUTING_TABLE_PARAM]: this._routingContext}; + connection.run(CALL_GET_ROUTING_TABLE, params, streamObserver); + } else { + connection.run(CALL_GET_SERVERS, {}, streamObserver); + } + }); + } } diff --git a/src/v1/internal/server-version.js b/src/v1/internal/server-version.js new file mode 100644 index 000000000..1c25d71fc --- /dev/null +++ b/src/v1/internal/server-version.js @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {assertString} from './util'; + +const SERVER_VERSION_REGEX = new RegExp('^(Neo4j/)?(\\d+)\\.(\\d+)(?:\\.)?(\\d*)(\\.|-|\\+)?([0-9A-Za-z-.]*)?$'); + +class ServerVersion { + + /** + * @constructor + * @param {number} major the major version number. + * @param {number} minor the minor version number. + * @param {number} patch the patch version number. + */ + constructor(major, minor, patch) { + this.major = major; + this.minor = minor; + this.patch = patch; + } + + /** + * Parse given string to a {@link ServerVersion} object. + * @param versionStr the string to parse. + * @return {ServerVersion} version for the given string. + * @throws Error if given string can't be parsed. + */ + static fromString(versionStr) { + if (!versionStr) { + return new ServerVersion(3, 0, 0); + } + + assertString(versionStr, 'Neo4j version string'); + + const version = versionStr.match(SERVER_VERSION_REGEX); + if (!version) { + throw new Error(`Unparsable Neo4j version: ${versionStr}`); + } + + const major = parseIntStrict(version[2]); + const minor = parseIntStrict(version[3]); + const patch = parseIntStrict(version[4] || 0); + + return new ServerVersion(major, minor, patch); + } + + /** + * Compare this version to the given one. + * @param {ServerVersion} other the version to compare with. + * @return {number} value 0 if this version is the same as the given one, value less then 0 when this version + * was released earlier than the given one and value greater then 0 when this version was released after + * than the given one. + */ + compareTo(other) { + let result = compareInts(this.major, other.major); + if (result === 0) { + result = compareInts(this.minor, other.minor); + if (result === 0) { + result = compareInts(this.patch, other.patch); + } + } + return result; + } +} + +function parseIntStrict(str, name) { + const value = parseInt(str); + if (!value && value !== 0) { + throw new Error(`Unparsable number ${name}: '${str}'`); + } + return value; +} + +function compareInts(x, y) { + return (x < y) ? -1 : ((x === y) ? 0 : 1); +} + +const VERSION_3_2_0 = new ServerVersion(3, 2, 0); + +export { + ServerVersion, + VERSION_3_2_0 +}; + + + + diff --git a/src/v1/internal/util.js b/src/v1/internal/util.js index 0e7efdc89..e7fd5c66e 100644 --- a/src/v1/internal/util.js +++ b/src/v1/internal/util.js @@ -20,8 +20,16 @@ const ENCRYPTION_ON = "ENCRYPTION_ON"; const ENCRYPTION_OFF = "ENCRYPTION_OFF"; +const URL_REGEX = new RegExp([ + '([^/]+//)?', // scheme + '(([^:/?#]*)', // hostname + '(?::([0-9]+))?)', // port (optional) + '([^?]*)?', // everything else + '(\\?(.+))?' // query +].join('')); + function isEmptyObjectOrNull(obj) { - if (isNull(obj)) { + if (obj === null) { return true; } @@ -38,10 +46,6 @@ function isEmptyObjectOrNull(obj) { return true; } -function isNull(obj) { - return obj === null; -} - function isObject(obj) { const type = typeof obj; return type === 'function' || type === 'object' && Boolean(obj); @@ -58,9 +62,66 @@ function isString(str) { return Object.prototype.toString.call(str) === '[object String]'; } +function parseScheme(url) { + assertString(url, 'URL'); + const scheme = url.match(URL_REGEX)[1] || ''; + return scheme.toLowerCase(); +} + +function parseUrl(url) { + assertString(url, 'URL'); + return url.match(URL_REGEX)[2]; +} + +function parseHost(url) { + assertString(url, 'URL'); + return url.match(URL_REGEX)[3]; +} + +function parsePort(url) { + assertString(url, 'URL'); + return url.match(URL_REGEX)[4]; +} + +function parseRoutingContext(url) { + const query = url.match(URL_REGEX)[7] || ''; + const context = {}; + if (query) { + query.split('&').forEach(pair => { + const keyValue = pair.split('='); + if (keyValue.length !== 2) { + throw new Error('Invalid parameters: \'' + keyValue + '\' in URL \'' + url + '\'.'); + } + + const key = trimAndVerify(keyValue[0], 'key', url); + const value = trimAndVerify(keyValue[1], 'value', url); + + if (context[key]) { + throw new Error(`Duplicated query parameters with key '${key}' in URL '${url}'`); + } + + context[key] = value; + }); + } + return context; +} + +function trimAndVerify(string, name, url) { + const result = string.trim(); + if (!result) { + throw new Error(`Illegal empty ${name} in URL query '${url}'`); + } + return result; +} + export { isEmptyObjectOrNull, assertString, + parseScheme, + parseUrl, + parseHost, + parsePort, + parseRoutingContext, ENCRYPTION_ON, ENCRYPTION_OFF } diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index adc4548b5..3e895cf90 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -27,12 +27,13 @@ import {LoadBalancer} from './internal/connection-providers'; */ class RoutingDriver extends Driver { - constructor(url, userAgent, token = {}, config = {}) { + constructor(url, routingContext, userAgent, token = {}, config = {}) { super(url, userAgent, token, RoutingDriver._validateConfig(config)); + this._routingContext = routingContext; } _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { - return new LoadBalancer(address, connectionPool, driverOnErrorCallback); + return new LoadBalancer(address, this._routingContext, connectionPool, driverOnErrorCallback); } _createSession(mode, connectionProvider, bookmark, config) { diff --git a/src/v1/session.js b/src/v1/session.js index 1dccd907f..b87fdc1cc 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -59,28 +59,34 @@ class Session { * @return {Result} - New Result */ run(statement, parameters = {}) { - if(typeof statement === 'object' && statement.text) { + if (typeof statement === 'object' && statement.text) { parameters = statement.parameters || {}; statement = statement.text; } - assertString(statement, "Cypher statement"); + assertString(statement, 'Cypher statement'); + return this._run(statement, parameters, (connection, streamObserver) => + connection.run(statement, parameters, streamObserver) + ); + } + + _run(statement, parameters, statementRunner) { const streamObserver = new _RunObserver(this._onRunFailure()); const connectionHolder = this._connectionHolderWithMode(this._mode); if (!this._hasTx) { connectionHolder.initializeConnection(); connectionHolder.getConnection().then(connection => { streamObserver.resolveConnection(connection); - connection.run(statement, parameters, streamObserver); + statementRunner(connection, streamObserver); connection.pullAll(streamObserver); connection.sync(); }).catch(error => streamObserver.onError(error)); } else { - streamObserver.onError(newError("Statements cannot be run directly on a " - + "session with an open transaction; either run from within the " - + "transaction or use a different session.")); + streamObserver.onError(newError('Statements cannot be run directly on a ' + + 'session with an open transaction; either run from within the ' + + 'transaction or use a different session.')); } - return new Result( streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder ); + return new Result(streamObserver, statement, parameters, () => streamObserver.meta(), connectionHolder); } /** diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 5a53812a8..c7b35d33b 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -135,7 +135,7 @@ describe('LoadBalancer', () => { }); it('initializes routing table with the given router', () => { - const loadBalancer = new LoadBalancer('server-ABC', newPool(), NO_OP_DRIVER_CALLBACK); + const loadBalancer = new LoadBalancer('server-ABC', {}, newPool(), NO_OP_DRIVER_CALLBACK); expectRoutingTable(loadBalancer, ['server-ABC'], @@ -1040,7 +1040,7 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved, expirationTime = Integer.MAX_VALUE, routerToRoutingTable = {}, connectionPool = null) { - const loadBalancer = new LoadBalancer(seedRouter, connectionPool || newPool(), NO_OP_DRIVER_CALLBACK); + const loadBalancer = new LoadBalancer(seedRouter, {}, connectionPool || newPool(), NO_OP_DRIVER_CALLBACK); loadBalancer._routingTable = new RoutingTable( new RoundRobinArray(routers), new RoundRobinArray(readers), @@ -1102,6 +1102,10 @@ class FakeConnection { static create(address, release) { return new FakeConnection(address, release); } + + initializationCompleted() { + return Promise.resolve(this); + } } class FakeRediscovery { diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index 3610a0b56..ba29ec2c4 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -24,6 +24,7 @@ import {Chunker} from '../../src/v1/internal/chunking'; import {alloc} from '../../src/v1/internal/buf'; import {Neo4jError} from '../../src/v1/error'; import sharedNeo4j from '../internal/shared-neo4j'; +import {ServerVersion} from '../../src/v1/internal/server-version'; describe('connector', () => { @@ -117,6 +118,64 @@ describe('connector', () => { channel.onmessage(packedFailureMessage(errorCode, errorMessage)); }); + it('should notify when connection initialization completes', done => { + const connection = connect('bolt://localhost'); + + connection.initializationCompleted().then(initializedConnection => { + expect(initializedConnection).toBe(connection); + done(); + }); + + connection.initialize('mydriver/0.0.0', basicAuthToken()); + }); + + it('should notify when connection initialization fails', done => { + const connection = connect('bolt://localhost:7474'); // wrong port + + connection.initializationCompleted().then(() => { + console.log('THEN called: ', arguments) + }).catch(error => { + expect(error).toBeDefined(); + done(); + }); + + connection.initialize('mydriver/0.0.0', basicAuthToken()); + }); + + it('should notify provided observer when connection initialization completes', done => { + const connection = connect('bolt://localhost'); + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onCompleted: metaData => { + expect(metaData).toBeDefined(); + done(); + }, + }); + }); + + it('should notify provided observer when connection initialization fails', done => { + const connection = connect('bolt://localhost:7474'); // wrong port + + connection.initialize('mydriver/0.0.0', basicAuthToken(), { + onError: error => { + expect(error).toBeDefined(); + done(); + }, + }); + }); + + it('should have server version after connection initialization completed', done => { + const connection = connect('bolt://localhost'); + + connection.initializationCompleted().then(initializedConnection => { + const serverVersion = ServerVersion.fromString(initializedConnection.server.version); + expect(serverVersion).toBeDefined(); + done(); + }); + + connection.initialize('mydriver/0.0.0', basicAuthToken()); + }); + function packedHandshakeMessage() { const result = alloc(4); result.putInt32(0, 1); diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 60dd3c11e..c2b773d1d 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -31,9 +31,14 @@ export default class FakeConnection { this.resetAsyncInvoked = 0; this.syncInvoked = 0; this.releaseInvoked = 0; + this.seenStatements = []; + this.seenParameters = []; + this.server = {}; } - run() { + run(statement, parameters) { + this.seenStatements.push(statement); + this.seenParameters.push(parameters); } discardAll() { @@ -55,6 +60,10 @@ export default class FakeConnection { this.releaseInvoked++; } + initializationCompleted() { + return Promise.resolve(this); + } + isReleasedOnceOnSessionClose() { return this.isReleasedOnSessionCloseTimes(1); } @@ -80,4 +89,9 @@ export default class FakeConnection { this.syncInvoked === times && this.releaseInvoked === times; } + + withServerVersion(version) { + this.server.version = version; + return this; + } }; diff --git a/test/internal/host-name-resolvers.test.js b/test/internal/host-name-resolvers.test.js index 2ca2a8149..c798a6754 100644 --- a/test/internal/host-name-resolvers.test.js +++ b/test/internal/host-name-resolvers.test.js @@ -19,7 +19,7 @@ import {DnsHostNameResolver, DummyHostNameResolver} from '../../src/v1/internal/host-name-resolvers'; import hasFeature from '../../src/v1/internal/features'; -import {parseHost, parsePort, parseScheme} from '../../src/v1/internal/connector'; +import {parseHost, parsePort, parseScheme} from '../../src/v1/internal/util'; describe('DummyHostNameResolver', () => { diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js index b23816207..cda4cec6d 100644 --- a/test/internal/rediscovery.test.js +++ b/test/internal/rediscovery.test.js @@ -18,7 +18,7 @@ */ import Rediscovery from "../../src/v1/internal/rediscovery"; -import GetServersUtil from "../../src/v1/internal/get-servers-util"; +import RoutingUtil from "../../src/v1/internal/routing-util"; import {newError, PROTOCOL_ERROR} from "../../src/v1/error"; import Record from "../../src/v1/record"; import {int} from "../../src/v1/integer"; @@ -30,8 +30,8 @@ const ROUTER_ADDRESS = 'bolt+routing://test.router.com'; describe('rediscovery', () => { it('should return null when connection error happens', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => null, + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => null, }); lookupRoutingTableOnRouter(util).then(routingTable => { @@ -41,8 +41,8 @@ describe('rediscovery', () => { }); it('should throw when no records are returned', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [], }); lookupRoutingTableOnRouter(util).catch(error => { @@ -52,8 +52,8 @@ describe('rediscovery', () => { }); it('should throw when multiple records are returned', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa']), new Record(['b'], ['bbb'])] + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa']), new Record(['b'], ['bbb'])] }); lookupRoutingTableOnRouter(util).catch(error => { @@ -63,8 +63,8 @@ describe('rediscovery', () => { }); it('should throw when ttl parsing throws', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => { throw newError('Unable to parse TTL', PROTOCOL_ERROR); } @@ -77,8 +77,8 @@ describe('rediscovery', () => { }); it('should throw when servers parsing throws', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), parseServers: () => { throw newError('Unable to parse servers', PROTOCOL_ERROR); @@ -92,8 +92,8 @@ describe('rediscovery', () => { }); it('should throw when no routers', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), parseServers: () => { return { @@ -111,8 +111,8 @@ describe('rediscovery', () => { }); it('should throw when no readers', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), parseServers: () => { return { @@ -130,8 +130,8 @@ describe('rediscovery', () => { }); it('should return routing table when no writers', done => { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), parseServers: () => { return { @@ -162,8 +162,8 @@ describe('rediscovery', () => { }); function testValidRoutingTable(routerAddresses, readerAddresses, writerAddresses, expires, done) { - const util = new FakeGetServersUtil({ - callGetServers: () => [new Record(['a'], ['aaa'])], + const util = new FakeRoutingUtil({ + callRoutingProcedure: () => [new Record(['a'], ['aaa'])], parseTtl: () => expires, parseServers: () => { return { @@ -188,8 +188,8 @@ describe('rediscovery', () => { }); } - function lookupRoutingTableOnRouter(getServersUtil) { - const rediscovery = new Rediscovery(getServersUtil); + function lookupRoutingTableOnRouter(routingUtil) { + const rediscovery = new Rediscovery(routingUtil); return rediscovery.lookupRoutingTableOnRouter(null, ROUTER_ADDRESS); } @@ -202,19 +202,19 @@ describe('rediscovery', () => { throw new Error('Should not be called'); } - class FakeGetServersUtil extends GetServersUtil { + class FakeRoutingUtil extends RoutingUtil { - constructor({callGetServers = shouldNotBeCalled, parseTtl = shouldNotBeCalled, parseServers = shouldNotBeCalled}) { + constructor({callRoutingProcedure = shouldNotBeCalled, parseTtl = shouldNotBeCalled, parseServers = shouldNotBeCalled}) { super(); - this._callGetServers = callGetServers; + this._callAvailableRoutingProcedure = callRoutingProcedure; this._parseTtl = parseTtl; this._parseServers = parseServers; } - callGetServers(session, routerAddress) { + callRoutingProcedure(session, routerAddress) { return new Promise((resolve, reject) => { try { - resolve(this._callGetServers()); + resolve(this._callAvailableRoutingProcedure()); } catch (error) { reject(error); } diff --git a/test/internal/get-servers-util.test.js b/test/internal/routing-util.test.js similarity index 67% rename from test/internal/get-servers-util.test.js rename to test/internal/routing-util.test.js index 95855cf5a..6229e10b5 100644 --- a/test/internal/get-servers-util.test.js +++ b/test/internal/routing-util.test.js @@ -17,11 +17,12 @@ * limitations under the License. */ -import GetServersUtil from "../../src/v1/internal/get-servers-util"; -import Record from "../../src/v1/record"; -import Integer, {int} from "../../src/v1/integer"; -import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from "../../src/v1/error"; -import lolex from "lolex"; +import RoutingUtil from '../../src/v1/internal/routing-util'; +import Record from '../../src/v1/record'; +import Integer, {int} from '../../src/v1/integer'; +import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error'; +import lolex from 'lolex'; +import FakeConnection from './fake-connection'; const ROUTER_ADDRESS = 'bolt+routing://test.router.com'; @@ -40,7 +41,7 @@ describe('get-servers-util', () => { it('should return retrieved records when query succeeds', done => { const session = FakeSession.successful({records: ['foo', 'bar', 'baz']}); - callGetServers(session).then(records => { + callRoutingProcedure(session).then(records => { expect(records).toEqual(['foo', 'bar', 'baz']); done(); }).catch(console.log); @@ -49,7 +50,7 @@ describe('get-servers-util', () => { it('should close session when query succeeds', done => { const session = FakeSession.successful({records: ['foo', 'bar', 'baz']}); - callGetServers(session).then(() => { + callRoutingProcedure(session).then(() => { expect(session.isClosed()).toBeTruthy(); done(); }).catch(console.log); @@ -58,7 +59,7 @@ describe('get-servers-util', () => { it('should not close session when query fails', done => { const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)); - callGetServers(session).then(() => { + callRoutingProcedure(session).then(() => { expect(session.isClosed()).toBeFalsy(); done(); }).catch(console.log); @@ -67,7 +68,7 @@ describe('get-servers-util', () => { it('should return null on connection error', done => { const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)); - callGetServers(session).then(records => { + callRoutingProcedure(session).then(records => { expect(records).toBeNull(); done(); }).catch(console.log); @@ -76,7 +77,7 @@ describe('get-servers-util', () => { it('should fail when procedure not found', done => { const session = FakeSession.failed(newError('Oh no!', 'Neo.ClientError.Procedure.ProcedureNotFound')); - callGetServers(session).catch(error => { + callRoutingProcedure(session).catch(error => { expect(error.code).toBe(SERVICE_UNAVAILABLE); expect(error.message).toBe('Server ' + ROUTER_ADDRESS + ' could not perform routing. ' + 'Make sure you are connecting to a causal cluster'); @@ -84,6 +85,61 @@ describe('get-servers-util', () => { }); }); + it('should use getServers procedure when server version is older than 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.1.9'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getServers']); + expect(connection.seenParameters).toEqual([{}]); + done(); + }); + }); + + it('should use getRoutingTable procedure with empty routing context when server version is 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.2.0'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getRoutingTable({context})']); + expect(connection.seenParameters).toEqual([{context: {}}]); + done(); + }); + }); + + it('should use getRoutingTable procedure with routing context when server version is 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.2.0'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {key1: 'value1', key2: 'value2'}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getRoutingTable({context})']); + expect(connection.seenParameters).toEqual([{context: {key1: 'value1', key2: 'value2'}}]); + done(); + }); + }); + + it('should use getRoutingTable procedure with empty routing context when server version is newer than 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.3.5'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getRoutingTable({context})']); + expect(connection.seenParameters).toEqual([{context: {}}]); + done(); + }); + }); + + it('should use getRoutingTable procedure with routing context when server version is newer than 3.2.0', done => { + const connection = new FakeConnection().withServerVersion('Neo4j/3.2.8'); + const session = FakeSession.withFakeConnection(connection); + + callRoutingProcedure(session, {key1: 'foo', key2: 'bar'}).then(() => { + expect(connection.seenStatements).toEqual(['CALL dbms.cluster.routing.getRoutingTable({context})']); + expect(connection.seenParameters).toEqual([{context: {key1: 'foo', key2: 'bar'}}]); + done(); + }); + }); + it('should parse valid ttl', () => { testValidTtlParsing(100, 5); testValidTtlParsing(Date.now(), 3600); // 1 hour @@ -197,18 +253,18 @@ describe('get-servers-util', () => { expect(writers.toArray()).toEqual(writerAddresses); } - function callGetServers(session) { - const util = new GetServersUtil(); - return util.callGetServers(session, ROUTER_ADDRESS); + function callRoutingProcedure(session, routingContext) { + const util = new RoutingUtil(routingContext || {}); + return util.callRoutingProcedure(session, ROUTER_ADDRESS); } function parseTtl(record) { - const util = new GetServersUtil(); + const util = new RoutingUtil(); return util.parseTtl(record, ROUTER_ADDRESS); } function parseServers(record) { - const util = new GetServersUtil(); + const util = new RoutingUtil(); return util.parseServers(record, ROUTER_ADDRESS); } @@ -245,21 +301,30 @@ describe('get-servers-util', () => { class FakeSession { - constructor(runResponse) { + constructor(runResponse, fakeConnection) { this._runResponse = runResponse; + this._fakeConnection = fakeConnection; this._closed = false; } static successful(result) { - return new FakeSession(Promise.resolve(result)); + return new FakeSession(Promise.resolve(result), null); } static failed(error) { - return new FakeSession(Promise.reject(error)); + return new FakeSession(Promise.reject(error), null); } - run() { - return this._runResponse; + static withFakeConnection(connection) { + return new FakeSession(null, connection); + } + + _run(ignoreStatement, ignoreParameters, statementRunner) { + if (this._runResponse) { + return this._runResponse; + } + statementRunner(this._fakeConnection); + return Promise.resolve(); } close() { diff --git a/test/internal/server-version.test.js b/test/internal/server-version.test.js new file mode 100644 index 000000000..72141e4f9 --- /dev/null +++ b/test/internal/server-version.test.js @@ -0,0 +1,118 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import {ServerVersion, VERSION_3_2_0} from '../../src/v1/internal/server-version'; + +describe('ServerVersion', () => { + + it('should construct with correct values', () => { + verifyVersion(new ServerVersion(2, 3, 10), 2, 3, 10); + verifyVersion(new ServerVersion(3, 2, 0), 3, 2, 0); + verifyVersion(new ServerVersion(1, 9, 12), 1, 9, 12); + }); + + it('should define correct 3.2.0 constant', () => { + verifyVersion(VERSION_3_2_0, 3, 2, 0); + }); + + it('should parse "undefined" strings to 3.0.0 for backwards compatibility reasons', () => { + verifyVersion(parse(null), 3, 0, 0); + verifyVersion(parse(undefined), 3, 0, 0); + verifyVersion(parse(''), 3, 0, 0); + }); + + it('should fail to parse object, array and function', () => { + expect(() => parse({})).toThrowError(TypeError); + expect(() => parse([])).toThrowError(TypeError); + expect(() => parse(() => { + })).toThrowError(TypeError); + }); + + it('should fail to parse illegal strings', () => { + expect(() => parse('Cat')).toThrow(); + expect(() => parse('Dog')).toThrow(); + expect(() => parse('Neo4j')).toThrow(); + expect(() => parse('Neo4j/')).toThrow(); + expect(() => parse('Not-Neo4j/3.1.0')).toThrow(); + expect(() => parse('Neo4j/5')).toThrow(); + expect(() => parse('Neo4j/3.')).toThrow(); + expect(() => parse('Neo4j/1.A')).toThrow(); + expect(() => parse('Neo4j/1.Hello')).toThrow(); + }); + + it('should parse valid version strings', () => { + verifyVersion(parse('Neo4j/3.2.1'), 3, 2, 1); + verifyVersion(parse('Neo4j/1.9.10'), 1, 9, 10); + verifyVersion(parse('Neo4j/7.77.777'), 7, 77, 777); + + verifyVersion(parse('Neo4j/3.10.0-GA'), 3, 10, 0); + verifyVersion(parse('Neo4j/2.5.5-RC01'), 2, 5, 5); + verifyVersion(parse('Neo4j/3.1.1-SNAPSHOT'), 3, 1, 1); + verifyVersion(parse('Neo4j/2.0.0-M09'), 2, 0, 0); + + verifyVersion(parse('Neo4j/3.2'), 3, 2, 0); + verifyVersion(parse('Neo4j/1.5'), 1, 5, 0); + verifyVersion(parse('Neo4j/42.42'), 42, 42, 0); + + verifyVersion(parse('Neo4j/2.2-GA'), 2, 2, 0); + verifyVersion(parse('Neo4j/3.0-RC01'), 3, 0, 0); + verifyVersion(parse('Neo4j/2.3-SNAPSHOT'), 2, 3, 0); + verifyVersion(parse('Neo4j/2.2-M09'), 2, 2, 0); + }); + + it('should compare equal versions', () => { + expect(new ServerVersion(3, 1, 0).compareTo(new ServerVersion(3, 1, 0))).toEqual(0); + expect(new ServerVersion(1, 9, 12).compareTo(new ServerVersion(1, 9, 12))).toEqual(0); + expect(new ServerVersion(2, 3, 8).compareTo(new ServerVersion(2, 3, 8))).toEqual(0); + }); + + it('should compare correctly by major', () => { + expect(new ServerVersion(3, 1, 0).compareTo(new ServerVersion(2, 1, 0))).toBeGreaterThan(0); + expect(new ServerVersion(2, 1, 0).compareTo(new ServerVersion(3, 1, 0))).toBeLessThan(0); + + expect(new ServerVersion(3, 1, 4).compareTo(new ServerVersion(1, 9, 10))).toBeGreaterThan(0); + expect(new ServerVersion(1, 5, 42).compareTo(new ServerVersion(10, 10, 43))).toBeLessThan(0); + }); + + it('should compare correctly by minor', () => { + expect(new ServerVersion(3, 3, 0).compareTo(new ServerVersion(3, 1, 0))).toBeGreaterThan(0); + expect(new ServerVersion(1, 3, 5).compareTo(new ServerVersion(1, 9, 5))).toBeLessThan(0); + + expect(new ServerVersion(3, 9, 5).compareTo(new ServerVersion(3, 1, 2))).toBeGreaterThan(0); + expect(new ServerVersion(1, 5, 42).compareTo(new ServerVersion(1, 10, 11))).toBeLessThan(0); + }); + + it('should compare correctly by patch', () => { + expect(new ServerVersion(3, 3, 6).compareTo(new ServerVersion(3, 3, 5))).toBeGreaterThan(0); + expect(new ServerVersion(1, 8, 2).compareTo(new ServerVersion(1, 8, 8))).toBeLessThan(0); + expect(new ServerVersion(9, 9, 9).compareTo(new ServerVersion(9, 9, 0))).toBeGreaterThan(0); + expect(new ServerVersion(3, 3, 3).compareTo(new ServerVersion(3, 3, 42))).toBeLessThan(0); + }); + +}); + +function verifyVersion(serverVersion, expectedMajor, expectedMinor, expectedPatch) { + expect(serverVersion.major).toEqual(expectedMajor); + expect(serverVersion.minor).toEqual(expectedMinor); + expect(serverVersion.patch).toEqual(expectedPatch); +} + +function parse(string) { + return ServerVersion.fromString(string); +} diff --git a/test/internal/util.test.js b/test/internal/util.test.js index bc557e107..907ad3421 100644 --- a/test/internal/util.test.js +++ b/test/internal/util.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -const util = require('../../lib/v1/internal/util.js'); +import * as util from '../../src/v1/internal/util'; describe('util', () => { @@ -55,6 +55,112 @@ describe('util', () => { verifyInvalidString(console.log); }); + it('should parse scheme', () => { + verifyScheme('bolt://', 'bolt://localhost'); + verifyScheme('bolt://', 'bolt://localhost:7687'); + verifyScheme('bolt://', 'bolt://neo4j.com'); + verifyScheme('bolt://', 'bolt://neo4j.com:80'); + + verifyScheme('bolt+routing://', 'bolt+routing://127.0.0.1'); + verifyScheme('bolt+routing://', 'bolt+routing://127.0.0.1:7687'); + verifyScheme('bolt+routing://', 'bolt+routing://neo4j.com'); + verifyScheme('bolt+routing://', 'bolt+routing://neo4j.com:80'); + + verifyScheme('wss://', 'wss://server.com'); + verifyScheme('wss://', 'wss://server.com:7687'); + verifyScheme('wss://', 'wss://1.1.1.1'); + verifyScheme('wss://', 'wss://8.8.8.8:80'); + + verifyScheme('', 'invalid url'); + verifyScheme('', 'localhost:7676'); + verifyScheme('', '127.0.0.1'); + }); + + it('should fail to parse scheme from non-string argument', () => { + expect(() => util.parseScheme({})).toThrowError(TypeError); + expect(() => util.parseScheme(['bolt://localhost:2020'])).toThrowError(TypeError); + expect(() => util.parseScheme(() => 'bolt://localhost:8888')).toThrowError(TypeError); + }); + + it('should parse url', () => { + verifyUrl('localhost', 'bolt://localhost'); + verifyUrl('localhost:9090', 'bolt://localhost:9090'); + verifyUrl('127.0.0.1', 'bolt://127.0.0.1'); + verifyUrl('127.0.0.1:7687', 'bolt://127.0.0.1:7687'); + verifyUrl('10.198.20.1', 'bolt+routing://10.198.20.1'); + verifyUrl('15.8.8.9:20004', 'wss://15.8.8.9:20004'); + }); + + it('should fail to parse url from non-string argument', () => { + expect(() => util.parseUrl({})).toThrowError(TypeError); + expect(() => util.parseUrl(['bolt://localhost:2020'])).toThrowError(TypeError); + expect(() => util.parseUrl(() => 'bolt://localhost:8888')).toThrowError(TypeError); + }); + + it('should parse host', () => { + verifyHost('localhost', 'bolt://localhost'); + verifyHost('neo4j.com', 'bolt+routing://neo4j.com'); + verifyHost('neo4j.com', 'bolt+routing://neo4j.com:8080'); + verifyHost('127.0.0.1', 'https://127.0.0.1'); + verifyHost('127.0.0.1', 'ws://127.0.0.1:2020'); + }); + + it('should fail to parse host from non-string argument', () => { + expect(() => util.parseHost({})).toThrowError(TypeError); + expect(() => util.parseHost(['bolt://localhost:2020'])).toThrowError(TypeError); + expect(() => util.parseHost(() => 'bolt://localhost:8888')).toThrowError(TypeError); + }); + + it('should parse port', () => { + verifyPort('7474', 'http://localhost:7474'); + verifyPort('8080', 'http://127.0.0.1:8080'); + verifyPort('20005', 'bolt+routing://neo4j.com:20005'); + verifyPort('4242', 'bolt+routing://1.1.1.1:4242'); + verifyPort('42', 'http://10.192.168.5:42'); + + verifyPort(undefined, 'https://localhost'); + verifyPort(undefined, 'ws://8.8.8.8'); + }); + + it('should fail to parse port from non-string argument', () => { + expect(() => util.parsePort({port: 1515})).toThrowError(TypeError); + expect(() => util.parsePort(['bolt://localhost:2020'])).toThrowError(TypeError); + expect(() => util.parsePort(() => 'bolt://localhost:8888')).toThrowError(TypeError); + }); + + it('should parse routing context', () => { + verifyRoutingContext({ + name: 'molly', + age: '1', + color: 'white' + }, 'bolt+routing://localhost:7687/cat?name=molly&age=1&color=white'); + + verifyRoutingContext({ + key1: 'value1', + key2: 'value2' + }, 'bolt+routing://localhost:7687/?key1=value1&key2=value2'); + + verifyRoutingContext({key: 'value'}, 'bolt+routing://10.198.12.2:9999?key=value'); + + verifyRoutingContext({}, 'bolt+routing://localhost:7687?'); + verifyRoutingContext({}, 'bolt+routing://localhost:7687/?'); + verifyRoutingContext({}, 'bolt+routing://localhost:7687/cat?'); + verifyRoutingContext({}, 'bolt+routing://localhost:7687/lala'); + }); + + it('should fail to parse routing context from non-string argument', () => { + expect(() => util.parseRoutingContext({key1: 'value1'})).toThrowError(TypeError); + expect(() => util.parseRoutingContext(['bolt://localhost:2020/?key=value'])).toThrowError(TypeError); + expect(() => util.parseRoutingContext(() => 'bolt://localhost?key1=value&key2=value2')).toThrowError(TypeError); + }); + + it('should fail to parse routing context from illegal parameters', () => { + expect(() => util.parseRoutingContext('bolt+routing://localhost:7687/?justKey')).toThrow(); + expect(() => util.parseRoutingContext('bolt+routing://localhost:7687/?=value1&key2=value2')).toThrow(); + expect(() => util.parseRoutingContext('bolt+routing://localhost:7687/key1?=value1&key2=')).toThrow(); + expect(() => util.parseRoutingContext('bolt+routing://localhost:7687/?key1=value1&key2=value2&key1=value2')).toThrow(); + }); + function verifyValidString(str) { expect(util.assertString(str, 'Test string')).toBe(str); } @@ -63,4 +169,24 @@ describe('util', () => { expect(() => util.assertString(str, 'Test string')).toThrowError(TypeError); } + function verifyScheme(expectedScheme, url) { + expect(util.parseScheme(url)).toEqual(expectedScheme); + } + + function verifyUrl(expectedUrl, url) { + expect(util.parseUrl(url)).toEqual(expectedUrl); + } + + function verifyHost(expectedHost, url) { + expect(util.parseHost(url)).toEqual(expectedHost); + } + + function verifyPort(expectedPort, url) { + expect(util.parsePort(url)).toEqual(expectedPort); + } + + function verifyRoutingContext(expectedRoutingContext, url) { + expect(util.parseRoutingContext(url)).toEqual(expectedRoutingContext); + } + }); diff --git a/test/resources/boltkit/get_routing_table.script b/test/resources/boltkit/get_routing_table.script new file mode 100644 index 000000000..2acb2013a --- /dev/null +++ b/test/resources/boltkit/get_routing_table.script @@ -0,0 +1,17 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +S: SUCCESS {"server": "Neo4j/3.2.2"} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Alice"] + RECORD ["Bob"] + RECORD ["Eve"] + SUCCESS {} diff --git a/test/resources/boltkit/get_routing_table_with_context.script b/test/resources/boltkit/get_routing_table_with_context.script new file mode 100644 index 000000000..56c268fa5 --- /dev/null +++ b/test/resources/boltkit/get_routing_table_with_context.script @@ -0,0 +1,16 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +S: SUCCESS {"server": "Neo4j/3.2.3"} +C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {"policy": "my_policy", "region": "china"}} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9001", "127.0.0.1:9002"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name AS name" {} + PULL_ALL +S: SUCCESS {"fields": ["name"]} + RECORD ["Alice"] + RECORD ["Bob"] + SUCCESS {} diff --git a/test/resources/boltkit/rediscover_and_read_with_init.script b/test/resources/boltkit/rediscover_and_read_with_init.script new file mode 100644 index 000000000..c2ce54c6f --- /dev/null +++ b/test/resources/boltkit/rediscover_and_read_with_init.script @@ -0,0 +1,16 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +S: SUCCESS {"server": "Neo4j/3.1.0"} +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "ROUTE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name" {} + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + RECORD ["Bob"] + RECORD ["Tina"] + SUCCESS {} diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index 6795af7cf..3ef9dc8f3 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -181,6 +181,10 @@ describe('driver', () => { expect(createRoutingDriverWithTOFU).toThrow(); }); + it('should fail when bolt:// scheme used with routing params', () => { + expect(() => neo4j.driver('bolt://localhost:7687/?policy=my_policy')).toThrow(); + }); + const exposedTypes = [ 'Node', 'Path', diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index a16892db8..bb627fd8a 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -1549,6 +1549,87 @@ describe('routing driver', () => { }); }); + it('should invoke procedure get routing table when server version permits', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/get_routing_table.script', 9001); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + const session = driver.session(); + session.run('MATCH (n) RETURN n.name AS name').then(result => { + const names = result.records.map(record => record.get('name')); + expect(names).toEqual(['Alice', 'Bob', 'Eve']); + + session.close(() => { + driver.close(); + router.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + + it('should send routing context to server', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/get_routing_table_with_context.script', 9001); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001/?policy=my_policy®ion=china'); + const session = driver.session(); + session.run('MATCH (n) RETURN n.name AS name').then(result => { + const names = result.records.map(record => record.get('name')); + expect(names).toEqual(['Alice', 'Bob']); + + session.close(() => { + driver.close(); + router.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + + it('should ignore routing context when server does not support it', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const router = kit.start('./test/resources/boltkit/rediscover_and_read_with_init.script', 9001); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001/?policy=my_policy'); + const session = driver.session(); + session.run('MATCH (n) RETURN n.name').then(result => { + const names = result.records.map(record => record.get(0)); + expect(names).toEqual(['Bob', 'Tina']); + + session.close(() => { + driver.close(); + router.exit(code => { + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + }); + function moveNextDateNow30SecondsForward() { const currentTime = Date.now(); hijackNextDateNowCall(currentTime + 30 * 1000 + 1); @@ -1700,19 +1781,23 @@ describe('routing driver', () => { } function setupFakeHostNameResolution(driver, seedRouter, resolvedAddresses) { - driver._connectionProvider._hostNameResolver = new FakeHostNameResolver(seedRouter, resolvedAddresses); + const connectionProvider = driver._getOrCreateConnectionProvider(); + connectionProvider._hostNameResolver = new FakeHostNameResolver(seedRouter, resolvedAddresses); } function getConnectionPool(driver) { - return driver._connectionProvider._connectionPool; + const connectionProvider = driver._getOrCreateConnectionProvider(); + return connectionProvider._connectionPool; } function getRoutingTable(driver) { - return driver._connectionProvider._routingTable; + const connectionProvider = driver._getOrCreateConnectionProvider(); + return connectionProvider._routingTable; } function setRoutingTable(driver, newRoutingTable) { - driver._connectionProvider._routingTable = newRoutingTable; + const connectionProvider = driver._getOrCreateConnectionProvider(); + connectionProvider._routingTable = newRoutingTable; } function joinStrings(array) {