From f19392558d197fad45f569b5b7d6f34a52ec8133 Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 27 Jan 2017 18:42:59 +0100 Subject: [PATCH 1/8] Simplify round robin array Removed unused methods, simplified remove operation and added couple tests. --- src/v1/internal/round-robin-array.js | 39 ++------- src/v1/routing-driver.js | 6 +- test/internal/round-robin-array.test.js | 105 ++++++++++++++++++------ 3 files changed, 89 insertions(+), 61 deletions(-) diff --git a/src/v1/internal/round-robin-array.js b/src/v1/internal/round-robin-array.js index 4e6cc41c7..a14cfccfb 100644 --- a/src/v1/internal/round-robin-array.js +++ b/src/v1/internal/round-robin-array.js @@ -23,36 +23,26 @@ class RoundRobinArray { constructor(items) { this._items = items || []; - this._index = 0; + this._offset = 0; } next() { - let elem = this._items[this._index]; - if (this._items.length === 0) { - this._index = 0; - } else { - this._index = (this._index + 1) % (this._items.length); + if (this.isEmpty()) { + return null; } - return elem; - } - - push(elem) { - this._items.push(elem); + const index = this._offset % this.size(); + this._offset++; + return this._items[index]; } pushAll(elems) { Array.prototype.push.apply(this._items, elems); } - empty() { + isEmpty() { return this._items.length === 0; } - clear() { - this._items = []; - this._index = 0; - } - size() { return this._items.length; } @@ -62,20 +52,7 @@ class RoundRobinArray { } remove(item) { - let index = this._items.indexOf(item); - while (index != -1) { - this._items.splice(index, 1); - if (index < this._index) { - this._index -= 1; - } - //make sure we are in range - if (this._items.length === 0) { - this._index = 0; - } else { - this._index %= this._items.length; - } - index = this._items.indexOf(item, index); - } + this._items = this._items.filter(element => element !== item); } } diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index a0ac2a063..b75ec4e5c 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -169,8 +169,8 @@ class ClusterView { needsUpdate() { return this._expires.lessThan(Date.now()) || this.routers.size() <= 1 || - this.readers.empty() || - this.writers.empty(); + this.readers.isEmpty() || + this.writers.isEmpty(); } all() { @@ -243,7 +243,7 @@ function newClusterView(session) { readers.pushAll(addresses); } } - if (routers.empty() || writers.empty()) { + if (routers.isEmpty() || writers.isEmpty()) { return Promise.reject(newError("Invalid routing response from server", SERVICE_UNAVAILABLE)) } return new ClusterView(routers, readers, writers, expires); diff --git a/test/internal/round-robin-array.test.js b/test/internal/round-robin-array.test.js index 5381f1f9b..528829076 100644 --- a/test/internal/round-robin-array.test.js +++ b/test/internal/round-robin-array.test.js @@ -16,12 +16,69 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import RoundRobinArray from "../../lib/v1/internal/round-robin-array"; -var RoundRobinArray = require('../../lib/v1/internal/round-robin-array').default; +describe('round-robin-array', () => { -describe('round-robin-array', function() { - it('should step through array', function () { - var array = new RoundRobinArray([1,2,3,4,5]); + it('should behave correctly when empty', () => { + const array = new RoundRobinArray(); + + expect(array.isEmpty()).toBeTruthy(); + expect(array.size()).toEqual(0); + expect(array.next()).toBeNull(); + expect(array.toArray()).toEqual([]); + + array.remove(1); + + expect(array.isEmpty()).toBeTruthy(); + expect(array.size()).toEqual(0); + expect(array.next()).toBeNull(); + expect(array.toArray()).toEqual([]); + }); + + it('should behave correctly when contains single element', () => { + const array = new RoundRobinArray([5]); + + expect(array.isEmpty()).toBeFalsy(); + expect(array.size()).toEqual(1); + expect(array.next()).toEqual(5); + expect(array.toArray()).toEqual([5]); + + array.remove(1); + + expect(array.isEmpty()).toBeFalsy(); + expect(array.size()).toEqual(1); + expect(array.next()).toEqual(5); + expect(array.toArray()).toEqual([5]); + + array.remove(5); + + expect(array.isEmpty()).toBeTruthy(); + expect(array.size()).toEqual(0); + expect(array.next()).toBeNull(); + expect(array.toArray()).toEqual([]); + }); + + it('should push items', () => { + const array1 = new RoundRobinArray(); + array1.pushAll([]); + expect(array1.toArray()).toEqual([]); + + const array2 = new RoundRobinArray(); + array2.pushAll([1]); + expect(array2.toArray()).toEqual([1]); + + const array3 = new RoundRobinArray([1, 2, 3]); + array3.pushAll([4, 5]); + expect(array3.toArray()).toEqual([1, 2, 3, 4, 5]); + + const array4 = new RoundRobinArray([1, 2, 3]); + array4.pushAll([]); + expect(array4.toArray()).toEqual([1, 2, 3]); + }); + + it('should step through array', () => { + const array = new RoundRobinArray([1, 2, 3, 4, 5]); expect(array.next()).toEqual(1); expect(array.next()).toEqual(2); @@ -30,34 +87,32 @@ describe('round-robin-array', function() { expect(array.next()).toEqual(5); expect(array.next()).toEqual(1); expect(array.next()).toEqual(2); - //.... }); - it('should step through single element array', function () { - var array = new RoundRobinArray([5]); + it('should step through single element array', () => { + const array = new RoundRobinArray([5]); expect(array.next()).toEqual(5); expect(array.next()).toEqual(5); expect(array.next()).toEqual(5); - //.... }); - it('should handle deleting item before current ', function () { - var array = new RoundRobinArray([1,2,3,4,5]); + it('should handle deleting item before current ', () => { + const array = new RoundRobinArray([1, 2, 3, 4, 5]); expect(array.next()).toEqual(1); expect(array.next()).toEqual(2); array.remove(2); - expect(array.next()).toEqual(3); expect(array.next()).toEqual(4); expect(array.next()).toEqual(5); expect(array.next()).toEqual(1); expect(array.next()).toEqual(3); - //.... + expect(array.next()).toEqual(4); + expect(array.next()).toEqual(5); }); - it('should handle deleting item on current ', function () { - var array = new RoundRobinArray([1,2,3,4,5]); + it('should handle deleting item on current ', () => { + const array = new RoundRobinArray([1, 2, 3, 4, 5]); expect(array.next()).toEqual(1); expect(array.next()).toEqual(2); @@ -67,11 +122,11 @@ describe('round-robin-array', function() { expect(array.next()).toEqual(1); expect(array.next()).toEqual(2); expect(array.next()).toEqual(4); - //.... + expect(array.next()).toEqual(5); }); - it('should handle deleting item after current ', function () { - var array = new RoundRobinArray([1,2,3,4,5]); + it('should handle deleting item after current ', () => { + const array = new RoundRobinArray([1, 2, 3, 4, 5]); expect(array.next()).toEqual(1); expect(array.next()).toEqual(2); @@ -81,11 +136,10 @@ describe('round-robin-array', function() { expect(array.next()).toEqual(1); expect(array.next()).toEqual(2); expect(array.next()).toEqual(3); - //.... }); - it('should handle deleting last item ', function () { - var array = new RoundRobinArray([1,2,3,4,5]); + it('should handle deleting last item ', () => { + const array = new RoundRobinArray([1, 2, 3, 4, 5]); expect(array.next()).toEqual(1); expect(array.next()).toEqual(2); @@ -97,11 +151,10 @@ describe('round-robin-array', function() { expect(array.next()).toEqual(3); expect(array.next()).toEqual(4); expect(array.next()).toEqual(1); - //.... }); - it('should handle deleting first item ', function () { - var array = new RoundRobinArray([1,2,3,4,5]); + it('should handle deleting first item ', () => { + const array = new RoundRobinArray([1, 2, 3, 4, 5]); array.remove(1); expect(array.next()).toEqual(2); expect(array.next()).toEqual(3); @@ -111,17 +164,15 @@ describe('round-robin-array', function() { expect(array.next()).toEqual(3); expect(array.next()).toEqual(4); expect(array.next()).toEqual(5); - //.... }); - it('should handle deleting multiple items ', function () { - var array = new RoundRobinArray([1,2,3,1,1]); + it('should handle deleting multiple items ', () => { + const array = new RoundRobinArray([1, 2, 3, 1, 1]); array.remove(1); expect(array.next()).toEqual(2); expect(array.next()).toEqual(3); expect(array.next()).toEqual(2); expect(array.next()).toEqual(3); - //.... }); }); From 9a277b2d4208e13d521fbceba7de96c05f3a2b7e Mon Sep 17 00:00:00 2001 From: lutovich Date: Sat, 28 Jan 2017 00:06:31 +0100 Subject: [PATCH 2/8] Add routing table abstraction It contains 3 round robin arrays: routers, readers and writers. It also contains an expiration timestamp which corresponds to `ttl` received from `getServers` procedure call. --- src/v1/internal/routing-table.js | 58 ++++++++++ test/internal/routing-table.test.js | 160 ++++++++++++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 src/v1/internal/routing-table.js create mode 100644 test/internal/routing-table.test.js diff --git a/src/v1/internal/routing-table.js b/src/v1/internal/routing-table.js new file mode 100644 index 000000000..5de2ef8ee --- /dev/null +++ b/src/v1/internal/routing-table.js @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import {int} from "../integer"; + +const MIN_ROUTERS = 1; + +export default class RoutingTable { + + constructor(routers, readers, writers, expirationTime) { + this.routers = routers || new RoundRobinArray(); + this.readers = readers || new RoundRobinArray(); + this.writers = writers || new RoundRobinArray(); + + this._expirationTime = expirationTime || int(0); + } + + forget(address) { + // Don't remove it from the set of routers, since that might mean we lose our ability to re-discover, + // just remove it from the set of readers and writers, so that we don't use it for actual work without + // performing discovery first. + this.readers.remove(address); + this.writers.remove(address); + } + + serversDiff(otherRoutingTable) { + const oldServers = new Set(this._allServers()); + const newServers = otherRoutingTable._allServers(); + newServers.forEach(newServer => oldServers.delete(newServer)); + return Array.from(oldServers); + } + + isStale() { + return this._expirationTime.lessThan(Date.now()) || + this.routers.size() <= MIN_ROUTERS || + this.readers.isEmpty() || + this.writers.isEmpty(); + } + + _allServers() { + return [...this.routers.toArray(), ...this.readers.toArray(), ...this.writers.toArray()]; + } +} diff --git a/test/internal/routing-table.test.js b/test/internal/routing-table.test.js new file mode 100644 index 000000000..43840859a --- /dev/null +++ b/test/internal/routing-table.test.js @@ -0,0 +1,160 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import RoutingTable from "../../src/v1/internal/routing-table"; +import RoundRobinArray from "../../src/v1/internal/round-robin-array"; +import {int} from "../../src/v1/integer"; + +describe('routing-table', () => { + + it('should not be stale when has routers, readers, writers and future expiration date', () => { + const table = createTable([1, 2], [3, 4], [5, 6], notExpired()); + expect(table.isStale()).toBeFalsy(); + }); + + it('should be stale when expiration date in the past', () => { + const table = createTable([1, 2], [1, 2], [1, 2], expired()); + expect(table.isStale()).toBeTruthy(); + }); + + it('should be stale when has single router', () => { + const table = createTable([1], [2, 3], [4, 5], notExpired()); + expect(table.isStale()).toBeTruthy(); + }); + + it('should be stale when no readers', () => { + const table = createTable([1, 2], [], [3, 4], notExpired()); + expect(table.isStale()).toBeTruthy(); + }); + + it('should be stale when no writers', () => { + const table = createTable([1, 2], [3, 4], [], notExpired()); + expect(table.isStale()).toBeTruthy(); + }); + + it('should not be stale with single reader', () => { + const table = createTable([1, 2], [3], [4, 5], notExpired()); + expect(table.isStale()).toBeFalsy(); + }); + + it('should not be stale with single writer', () => { + const table = createTable([1, 2], [3, 4], [5], notExpired()); + expect(table.isStale()).toBeFalsy(); + }); + + it('should forget reader, writer but not router', () => { + const table = createTable([1, 2], [1, 2], [1, 2], notExpired()); + + table.forget(1); + + expect(table.routers.toArray()).toEqual([1, 2]); + expect(table.readers.toArray()).toEqual([2]); + expect(table.writers.toArray()).toEqual([2]); + }); + + it('should forget single reader', () => { + const table = createTable([1, 2], [42], [1, 2, 3], notExpired()); + + table.forget(42); + + expect(table.routers.toArray()).toEqual([1, 2]); + expect(table.readers.toArray()).toEqual([]); + expect(table.writers.toArray()).toEqual([1, 2, 3]); + }); + + it('should forget single writer', () => { + const table = createTable([1, 2], [3, 4, 5], [42], notExpired()); + + table.forget(42); + + expect(table.routers.toArray()).toEqual([1, 2]); + expect(table.readers.toArray()).toEqual([3, 4, 5]); + expect(table.writers.toArray()).toEqual([]); + }); + + it('should return all servers in diff when other table is empty', () => { + const oldTable = createTable([1, 2], [3, 4], [5, 6], notExpired()); + const newTable = createTable([], [], [], notExpired()); + + const servers = oldTable.serversDiff(newTable); + + expect(servers).toEqual([1, 2, 3, 4, 5, 6]); + }); + + it('should no servers in diff when this table is empty', () => { + const oldTable = createTable([], [], [], notExpired()); + const newTable = createTable([1, 2], [3, 4], [5, 6], notExpired()); + + const servers = oldTable.serversDiff(newTable); + + expect(servers).toEqual([]); + }); + + it('should include different routers in servers diff', () => { + const oldTable = createTable([1, 7, 2, 42], [3, 4], [5, 6], notExpired()); + const newTable = createTable([1, 2], [3, 4], [5, 6], notExpired()); + + const servers = oldTable.serversDiff(newTable); + + expect(servers).toEqual([7, 42]); + }); + + it('should include different readers in servers diff', () => { + const oldTable = createTable([1, 2], [3, 7, 4, 42], [5, 6], notExpired()); + const newTable = createTable([1, 2], [3, 4], [5, 6], notExpired()); + + const servers = oldTable.serversDiff(newTable); + + expect(servers).toEqual([7, 42]); + }); + + it('should include different writers in servers diff', () => { + const oldTable = createTable([1, 2], [3, 4], [5, 7, 6, 42], notExpired()); + const newTable = createTable([1, 2], [3, 4], [5, 6], notExpired()); + + const servers = oldTable.serversDiff(newTable); + + expect(servers).toEqual([7, 42]); + }); + + it('should include different servers in diff', () => { + const oldTable = createTable([1, 2, 11], [22, 3, 33, 4], [5, 44, 6], notExpired()); + const newTable = createTable([1], [2, 3, 4, 6], [5], notExpired()); + + const servers = oldTable.serversDiff(newTable); + + expect(servers).toEqual([11, 22, 33, 44]); + }); + + function expired() { + return Date.now() - 3600; // expired an hour ago + } + + function notExpired() { + return Date.now() + 3600; // will expire in an hour + } + + function createTable(routers, readers, writers, expirationTime) { + const routersArray = new RoundRobinArray(routers); + const readersArray = new RoundRobinArray(readers); + const writersArray = new RoundRobinArray(writers); + const expiration = int(expirationTime); + return new RoutingTable(routersArray, readersArray, writersArray, expiration); + } + +}); From 2e0fc1e62414e0af7c48602b454373d82fe4e03d Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 1 Feb 2017 16:05:02 +0100 Subject: [PATCH 3/8] Split routing logic into multiple components Previously it was mostly located in routing driver which owned the cluster view and all logic to call `getServers` and interpret it's output/failures. This introduces two new internal components: * `GetServersUtil` - responsible for calling `getServers` procedure and processing it's output * `Rediscovery` - responsible for fetching a routing table from the specified server Routing driver now owns a routing table and an instance of rediscovery. It uses the later one to update the routing table when needed. Added dev dependency to lolex(https://github.com/sinonjs/lolex) to be able to mock system time and test things better. --- package.json | 1 + src/v1/driver.js | 2 +- src/v1/error.js | 9 +- src/v1/internal/get-servers-util.js | 94 +++++++++ src/v1/internal/rediscovery.js | 65 +++++++ src/v1/internal/round-robin-array.js | 9 +- src/v1/internal/routing-table.js | 6 +- src/v1/routing-driver.js | 215 +++++++-------------- test/internal/get-servers-util.test.js | 246 ++++++++++++++++++++++++ test/internal/rediscovery.test.js | 231 ++++++++++++++++++++++ test/internal/round-robin-array.test.js | 39 +++- test/v1/driver.test.js | 8 +- 12 files changed, 753 insertions(+), 172 deletions(-) create mode 100644 src/v1/internal/get-servers-util.js create mode 100644 src/v1/internal/rediscovery.js create mode 100644 test/internal/get-servers-util.test.js create mode 100644 test/internal/rediscovery.test.js diff --git a/package.json b/package.json index c205d570e..36fa0b93d 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "gulp-util": "^3.0.6", "gulp-watch": "^4.3.5", "jasmine-reporters": "^2.0.7", + "lolex": "^1.5.2", "merge-stream": "^1.0.0", "minimist": "^1.2.0", "phantomjs-prebuilt": "^2.1.7 ", diff --git a/src/v1/driver.js b/src/v1/driver.js index 4ddb222d8..44dca767c 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -24,7 +24,7 @@ import {connect} from "./internal/connector"; import StreamObserver from './internal/stream-observer'; import {newError, SERVICE_UNAVAILABLE} from "./error"; -let READ = 'READ', WRITE = 'WRITE'; +const READ = 'READ', WRITE = 'WRITE'; /** * A driver maintains one or more {@link Session sessions} with a remote * Neo4j instance. Through the {@link Session sessions} you can send statements diff --git a/src/v1/error.js b/src/v1/error.js index 53b253ad5..10c5d2b38 100644 --- a/src/v1/error.js +++ b/src/v1/error.js @@ -20,8 +20,10 @@ // A common place for constructing error objects, to keep them // uniform across the driver surface. -let SERVICE_UNAVAILABLE = 'ServiceUnavailable'; -let SESSION_EXPIRED = 'SessionExpired'; +const SERVICE_UNAVAILABLE = 'ServiceUnavailable'; +const SESSION_EXPIRED = 'SessionExpired'; +const PROTOCOL_ERROR = 'ProtocolError'; + function newError(message, code="N/A") { // TODO: Idea is that we can check the code here and throw sub-classes // of Neo4jError as appropriate @@ -40,5 +42,6 @@ export { newError, Neo4jError, SERVICE_UNAVAILABLE, - SESSION_EXPIRED + SESSION_EXPIRED, + PROTOCOL_ERROR } diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js new file mode 100644 index 000000000..5e030170f --- /dev/null +++ b/src/v1/internal/get-servers-util.js @@ -0,0 +1,94 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import RoundRobinArray from "./round-robin-array"; +import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE} from "../error"; +import Integer, {int} from "../integer"; + +const PROCEDURE_CALL = 'CALL dbms.cluster.routing.getServers'; +const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; + +export default class GetServersUtil { + + callGetServers(session, routerAddress) { + return session.run(PROCEDURE_CALL).then(result => { + return result.records; + }).catch(error => { + if (error.code === PROCEDURE_NOT_FOUND_CODE) { + // throw when getServers procedure not found because this is clearly a configuration issue + throw newError('Server ' + routerAddress + ' could not perform routing. ' + + 'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE); + } + // return nothing when failed to connect because code higher in the callstack is still able to retry with a + // different session towards a different router + return null; + }); + } + + parseTtl(record, routerAddress) { + try { + const now = int(Date.now()); + const expires = record.get('ttl').multiply(1000).add(now); + // if the server uses a really big expire time like Long.MAX_VALUE this may have overflowed + if (expires.lessThan(now)) { + return Integer.MAX_VALUE; + } + return expires; + } catch (error) { + throw newError( + 'Unable to parse TTL entry from router ' + routerAddress + ' from record:\n' + JSON.stringify(record), + PROTOCOL_ERROR); + } + } + + parseServers(record, routerAddress) { + try { + const servers = record.get('servers'); + + const routers = new RoundRobinArray(); + const readers = new RoundRobinArray(); + const writers = new RoundRobinArray(); + + servers.forEach(server => { + const role = server['role']; + const addresses = server['addresses']; + + if (role === 'ROUTE') { + routers.pushAll(addresses); + } else if (role === 'WRITE') { + writers.pushAll(addresses); + } else if (role === 'READ') { + readers.pushAll(addresses); + } else { + throw newError('Unknown server role "' + role + '"', PROTOCOL_ERROR); + } + }); + + return { + routers: routers, + readers: readers, + writers: writers + } + } catch (ignore) { + throw newError( + 'Unable to parse servers entry from router ' + routerAddress + ' from record:\n' + JSON.stringify(record), + PROTOCOL_ERROR); + } + } +} diff --git a/src/v1/internal/rediscovery.js b/src/v1/internal/rediscovery.js new file mode 100644 index 000000000..b4fd92933 --- /dev/null +++ b/src/v1/internal/rediscovery.js @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GetServersUtil from "./get-servers-util"; +import RoutingTable from "./routing-table"; +import {newError, PROTOCOL_ERROR} from "../error"; + +export default class Rediscovery { + + constructor(getServersUtil) { + this._getServersUtil = getServersUtil || new GetServersUtil(); + } + + lookupRoutingTableOnRouter(session, routerAddress) { + return this._getServersUtil.callGetServers(session, routerAddress).then(records => { + if (records === null) { + // connection error happened, unable to retrieve routing table from this router, next one should be queried + return null; + } + + if (records.length !== 1) { + throw newError('Illegal response from router "' + routerAddress + '". ' + + 'Received ' + records.length + ' records but expected only one.\n' + JSON.stringify(records), + PROTOCOL_ERROR); + } + + const record = records[0]; + + const expirationTime = this._getServersUtil.parseTtl(record, routerAddress); + const {routers, readers, writers} = this._getServersUtil.parseServers(record, routerAddress); + + Rediscovery._assertNonEmpty(routers, 'routers', routerAddress); + Rediscovery._assertNonEmpty(readers, 'readers', routerAddress); + + if (writers.isEmpty()) { + // retrieved routing table has no writers, next router should be queried + return null; + } + + return new RoutingTable(routers, readers, writers, expirationTime); + }); + } + + static _assertNonEmpty(serversRoundRobinArray, serversName, routerAddress) { + if (serversRoundRobinArray.isEmpty()) { + throw newError('Received no ' + serversName + ' from router ' + routerAddress, PROTOCOL_ERROR); + } + } +} diff --git a/src/v1/internal/round-robin-array.js b/src/v1/internal/round-robin-array.js index a14cfccfb..d9ef40432 100644 --- a/src/v1/internal/round-robin-array.js +++ b/src/v1/internal/round-robin-array.js @@ -20,7 +20,8 @@ /** * An array that lets you hop through the elements endlessly. */ -class RoundRobinArray { +export default class RoundRobinArray { + constructor(items) { this._items = items || []; this._offset = 0; @@ -36,6 +37,10 @@ class RoundRobinArray { } pushAll(elems) { + if (!Array.isArray(elems)) { + throw new TypeError('Array expected but got: ' + elems); + } + Array.prototype.push.apply(this._items, elems); } @@ -55,5 +60,3 @@ class RoundRobinArray { this._items = this._items.filter(element => element !== item); } } - -export default RoundRobinArray diff --git a/src/v1/internal/routing-table.js b/src/v1/internal/routing-table.js index 5de2ef8ee..e834cdde2 100644 --- a/src/v1/internal/routing-table.js +++ b/src/v1/internal/routing-table.js @@ -17,6 +17,7 @@ * limitations under the License. */ import {int} from "../integer"; +import RoundRobinArray from "./round-robin-array"; const MIN_ROUTERS = 1; @@ -26,8 +27,7 @@ export default class RoutingTable { this.routers = routers || new RoundRobinArray(); this.readers = readers || new RoundRobinArray(); this.writers = writers || new RoundRobinArray(); - - this._expirationTime = expirationTime || int(0); + this.expirationTime = expirationTime || int(0); } forget(address) { @@ -46,7 +46,7 @@ export default class RoutingTable { } isStale() { - return this._expirationTime.lessThan(Date.now()) || + return this.expirationTime.lessThan(Date.now()) || this.routers.size() <= MIN_ROUTERS || this.readers.isEmpty() || this.writers.isEmpty(); diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index b75ec4e5c..2be24c1c7 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -17,12 +17,12 @@ * limitations under the License. */ -import Session from './session'; -import {Driver, READ, WRITE} from './driver'; +import Session from "./session"; +import {Driver, READ, WRITE} from "./driver"; import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from "./error"; -import RoundRobinArray from './internal/round-robin-array'; -import {int} from './integer' -import Integer from './integer' +import RoundRobinArray from "./internal/round-robin-array"; +import RoutingTable from "./internal/routing-table"; +import Rediscovery from "./internal/rediscovery"; /** * A driver that supports routing in a core-edge cluster. @@ -31,7 +31,8 @@ class RoutingDriver extends Driver { constructor(url, userAgent, token = {}, config = {}) { super(url, userAgent, token, RoutingDriver._validateConfig(config)); - this._clusterView = new ClusterView(new RoundRobinArray([url])); + this._routingTable = new RoutingTable(new RoundRobinArray([url])); + this._rediscovery = new Rediscovery(); } _createSession(connectionPromise, cb) { @@ -69,10 +70,10 @@ class RoutingDriver extends Driver { let url = 'UNKNOWN'; if (conn) { url = conn.url; - this._clusterView.writers.remove(conn.url); + this._routingTable.writers.remove(conn.url); } else { connectionPromise.then((conn) => { - this._clusterView.writers.remove(conn.url); + this._routingTable.writers.remove(conn.url); }).catch(() => {/*ignore*/}); } return newError("No longer possible to write to server at " + url, SESSION_EXPIRED); @@ -82,71 +83,74 @@ class RoutingDriver extends Driver { }); } - _updatedClusterView() { - if (!this._clusterView.needsUpdate()) { - return Promise.resolve(this._clusterView); - } else { - let call = () => { - let conn = this._pool.acquire(routers.next()); - let session = this._createSession(Promise.resolve(conn)); - return newClusterView(session).catch((err) => { - this._forget(conn); - return Promise.reject(err); - }); - }; - let routers = this._clusterView.routers; - //Build a promise chain that ends on the first successful call - //i.e. call().catch(call).catch(call).catch(call)... - //each call will try a different router - let acc = Promise.reject(); - for (let i = 0; i < routers.size(); i++) { - acc = acc.catch(call); - } - return acc; + _refreshedRoutingTable() { + const currentRoutingTable = this._routingTable; + + if (!currentRoutingTable.isStale()) { + return Promise.resolve(currentRoutingTable); } - } - _diff(oldView, updatedView) { - let oldSet = oldView.all(); - let newSet = updatedView.all(); - newSet.forEach((item) => { - oldSet.delete(item); + const knownRouters = currentRoutingTable.routers.toArray(); + + const refreshedTablePromise = knownRouters.reduce((refreshedTablePromise, currentRouter, currentIndex) => { + return refreshedTablePromise.then(newRoutingTable => { + if (newRoutingTable) { + // correct routing table was fetched, just return it + return newRoutingTable + } + + // returned routing table was undefined, this means a connection error happened and we need to forget the + // previous router and try the next one + const previousRouter = knownRouters[currentIndex - 1]; + if (previousRouter) { + this._forget(previousRouter); + } + + // todo: properly close this connection + const connection = this._pool.acquire(currentRouter); + const session = this._createSession(Promise.resolve(connection)); + + // try next router + return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter); + }) + }, Promise.resolve(null)); + + return refreshedTablePromise.then(newRoutingTable => { + if (newRoutingTable) { + // correct routing table was fetched, we need to clean all connections that are not in the new table and + // return the new table + const staleServers = currentRoutingTable.serversDiff(newRoutingTable); + staleServers.forEach(server => this._pool.purge); + return newRoutingTable + } + throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE); }); - return oldSet; } _acquireConnection(mode) { - let m = mode || WRITE; - //make sure we have enough servers - return this._updatedClusterView().then((view) => { - let toRemove = this._diff(this._clusterView, view); - let self = this; - toRemove.forEach((url) => { - self._pool.purge(url); - }); - //update our cached view - this._clusterView = view; - if (m === READ) { - let key = view.readers.next(); - if (!key) { - return Promise.reject(newError('No read servers available', SESSION_EXPIRED)); - } - return this._pool.acquire(key); - } else if (m === WRITE) { - let key = view.writers.next(); - if (!key) { - return Promise.reject(newError('No write servers available', SESSION_EXPIRED)); - } - return this._pool.acquire(key); + const connectionMode = mode || WRITE; + return this._refreshedRoutingTable().then(routingTable => { + if (connectionMode === READ) { + return this._acquireConnectionToServer(routingTable.readers, "read"); + } else if (connectionMode === WRITE) { + return this._acquireConnectionToServer(routingTable.writers, "write"); } else { - return Promise.reject(m + " is not a valid option"); + return Promise.reject(connectionMode + " is not a valid option"); // todo: better message } - }).catch((err) => {return Promise.reject(err)}); + }); + } + + _acquireConnectionToServer(serversRoundRobinArray, serverName) { + const address = serversRoundRobinArray.next(); + if (!address) { + return Promise.reject(newError('No ' + serverName + ' servers available', SESSION_EXPIRED)); + } + return this._pool.acquire(address); } _forget(url) { this._pool.purge(url); - this._clusterView.remove(url); + this._routingTable.remove(url); } static _validateConfig(config) { @@ -157,42 +161,6 @@ class RoutingDriver extends Driver { } } -class ClusterView { - constructor(routers, readers, writers, expires) { - this.routers = routers || new RoundRobinArray(); - this.readers = readers || new RoundRobinArray(); - this.writers = writers || new RoundRobinArray(); - this._expires = expires || int(-1); - - } - - needsUpdate() { - return this._expires.lessThan(Date.now()) || - this.routers.size() <= 1 || - this.readers.isEmpty() || - this.writers.isEmpty(); - } - - all() { - let seen = new Set(this.routers.toArray()); - let writers = this.writers.toArray(); - let readers = this.readers.toArray(); - for (let i = 0; i < writers.length; i++) { - seen.add(writers[i]); - } - for (let i = 0; i < readers.length; i++) { - seen.add(readers[i]); - } - return seen; - } - - remove(item) { - this.routers.remove(item); - this.readers.remove(item); - this.writers.remove(item); - } -} - class RoutingSession extends Session { constructor(connectionPromise, onClose, onFailedConnection) { super(connectionPromise, onClose); @@ -204,57 +172,4 @@ class RoutingSession extends Session { } } -let GET_SERVERS = "CALL dbms.cluster.routing.getServers"; - -/** - * Calls `getServers` and retrieves a new promise of a ClusterView. - * @param session - * @returns {Promise.} - */ -function newClusterView(session) { - return session.run(GET_SERVERS) - .then((res) => { - session.close(); - if (res.records.length != 1) { - return Promise.reject(newError("Invalid routing response from server", SERVICE_UNAVAILABLE)); - } - let record = res.records[0]; - let now = int(Date.now()); - let expires = record.get('ttl').multiply(1000).add(now); - //if the server uses a really big expire time like Long.MAX_VALUE - //this may have overflowed - if (expires.lessThan(now)) { - expires = Integer.MAX_VALUE; - } - let servers = record.get('servers'); - let routers = new RoundRobinArray(); - let readers = new RoundRobinArray(); - let writers = new RoundRobinArray(); - for (let i = 0; i < servers.length; i++) { - let server = servers[i]; - - let role = server['role']; - let addresses = server['addresses']; - if (role === 'ROUTE') { - routers.pushAll(addresses); - } else if (role === 'WRITE') { - writers.pushAll(addresses); - } else if (role === 'READ') { - readers.pushAll(addresses); - } - } - if (routers.isEmpty() || writers.isEmpty()) { - return Promise.reject(newError("Invalid routing response from server", SERVICE_UNAVAILABLE)) - } - return new ClusterView(routers, readers, writers, expires); - }) - .catch((e) => { - if (e.code === 'Neo.ClientError.Procedure.ProcedureNotFound') { - return Promise.reject(newError("Server could not perform routing, make sure you are connecting to a causal cluster", SERVICE_UNAVAILABLE)); - } else { - return Promise.reject(newError("No servers could be found at this instant.", SERVICE_UNAVAILABLE)); - } - }); -} - export default RoutingDriver diff --git a/test/internal/get-servers-util.test.js b/test/internal/get-servers-util.test.js new file mode 100644 index 000000000..7c7836a48 --- /dev/null +++ b/test/internal/get-servers-util.test.js @@ -0,0 +1,246 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import GetServersUtil from "../../src/v1/internal/get-servers-util"; +import Record from "../../src/v1/record"; +import Integer, {int} from "../../src/v1/integer"; +import {newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from "../../src/v1/error"; +import lolex from "lolex"; + +const ROUTER_ADDRESS = 'bolt+routing://test.router.com'; + +describe('get-servers-util', () => { + + let clock; + + beforeAll(() => { + clock = lolex.install(); + }); + + afterAll(() => { + clock.uninstall(); + }); + + it('should return retrieved records when query succeeds', done => { + const session = FakeSession.successful({records: ['foo', 'bar', 'baz']}); + + callGetServers(session, '').then(records => { + expect(records).toEqual(['foo', 'bar', 'baz']); + done(); + }).catch(console.log); + }); + + it('should return null on connection error', done => { + const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)); + + callGetServers(session).then(records => { + expect(records).toBeNull(); + done(); + }).catch(console.log); + }); + + it('should fail when procedure not found', done => { + const session = FakeSession.failed(newError('Oh no!', 'Neo.ClientError.Procedure.ProcedureNotFound')); + + callGetServers(session).catch(error => { + expect(error.code).toBe(SERVICE_UNAVAILABLE); + expect(error.message).toBe('Server ' + ROUTER_ADDRESS + ' could not perform routing. ' + + 'Make sure you are connecting to a causal cluster'); + done(); + }); + }); + + it('should parse valid ttl', () => { + testValidTtlParsing(100, 5); + testValidTtlParsing(Date.now(), 3600); // 1 hour + testValidTtlParsing(Date.now(), 86400); // 24 hours + testValidTtlParsing(Date.now(), 3628800); // 42 days + testValidTtlParsing(0, 1); + testValidTtlParsing(50, 0); + testValidTtlParsing(Date.now(), 0); + }); + + it('should not overflow parsing huge ttl', () => { + const record = newRecord({ttl: Integer.MAX_VALUE}); + clock.setSystemTime(42); + + const expirationTime = parseTtl(record); + + expect(expirationTime).toBe(Integer.MAX_VALUE); + }); + + it('should return valid value parsing negative ttl', () => { + const record = newRecord({ttl: int(-42)}); + clock.setSystemTime(42); + + const expirationTime = parseTtl(record); + + expect(expirationTime).toBe(Integer.MAX_VALUE); + }); + + it('should throw when record does not have a ttl entry', done => { + const record = new Record(["notTtl", "servers"], []); + expectProtocolError(() => parseTtl(record), done); + }); + + it('should parse servers', () => { + testValidServersParsing([], [], []); + + testValidServersParsing(['router1'], [], []); + testValidServersParsing([], ['reader1'], []); + testValidServersParsing([], [], ['writer1']); + + testValidServersParsing(['router1'], ['reader1'], []); + testValidServersParsing(['router1'], ['reader1'], ['writer1']); + testValidServersParsing([], ['reader1'], ['writer1']); + + testValidServersParsing(['router1'], ['reader1'], ['writer1']); + testValidServersParsing(['router1', 'router2'], ['reader1', 'reader2'], ['writer1']); + testValidServersParsing(['router1', 'router2'], ['reader1', 'reader2'], ['writer1', 'writer2']); + }); + + it('should fail to parse servers entry when record does not have servers', done => { + const record = new Record(['ttl', 'notServers'], [int(42), [{"role": "READ", "addresses": ['1', '2', '3']}]]); + expectProtocolError(() => parseServers(record), done); + }); + + it('should fail to parse servers entry without role', done => { + const record = new Record(['ttl', 'servers'], [int(42), [{"notRole": "READ", "addresses": ['1', '2', '3']}]]); + expectProtocolError(() => parseServers(record), done); + }); + + it('should fail to parse servers entry with illegal role', done => { + const record = new Record(['ttl', 'servers'], [int(42), [{"role": "ARBITER", "addresses": ['1', '2', '3']}]]); + expectProtocolError(() => parseServers(record), done); + }); + + it('should fail to parse servers entry with just ttl', done => { + const record = new Record(['ttl', 'servers'], [int(42), [{"role": "READ"}]]); + expectProtocolError(() => parseServers(record), done); + }); + + it('should fail to parse servers entry without addresses', done => { + const record = new Record(['ttl', 'servers'], [int(42), [{"role": "WRITE", "notAddresses": ['1', '2', '3']}]]); + expectProtocolError(() => parseServers(record), done); + }); + + it('should fail to parse servers entry with string addresses', done => { + const record = new Record(['ttl', 'servers'], [int(42), [{"role": "WRITE", "addresses": ''}]]); + expectProtocolError(() => parseServers(record), done); + }); + + it('should fail to parse servers entry with null addresses', done => { + const record = new Record(['ttl', 'servers'], [int(42), [{"role": "WRITE", "addresses": null}]]); + expectProtocolError(() => parseServers(record), done); + }); + + it('should fail to parse servers entry with integer addresses', done => { + const record = new Record(['ttl', 'servers'], [int(42), [{"role": "WRITE", "addresses": 12345}]]); + expectProtocolError(() => parseServers(record), done); + }); + + it('should fail to parse servers entry with object addresses', done => { + const record = new Record(['ttl', 'servers'], [int(42), [{"role": "WRITE", "addresses": {key: ['localhost']}}]]); + expectProtocolError(() => parseServers(record), done); + }); + + function testValidTtlParsing(currentTime, ttlSeconds) { + const record = newRecord({ttl: int(ttlSeconds)}); + clock.setSystemTime(currentTime); + + const expirationTime = parseTtl(record).toNumber(); + + expect(expirationTime).toEqual(currentTime + ttlSeconds * 1000); + } + + function testValidServersParsing(routerAddresses, readerAddresses, writerAddresses) { + const record = newRecord({routers: routerAddresses, readers: readerAddresses, writers: writerAddresses}); + + const {routers, readers, writers} = parseServers(record); + + expect(routers.toArray()).toEqual(routerAddresses); + expect(readers.toArray()).toEqual(readerAddresses); + expect(writers.toArray()).toEqual(writerAddresses); + } + + function callGetServers(session) { + const util = new GetServersUtil(); + return util.callGetServers(session, ROUTER_ADDRESS); + } + + function parseTtl(record) { + const util = new GetServersUtil(); + return util.parseTtl(record, ROUTER_ADDRESS); + } + + function parseServers(record) { + const util = new GetServersUtil(); + return util.parseServers(record, ROUTER_ADDRESS); + } + + function newRecord({ttl = int(42), routers = [], readers = [], writers = []}) { + const routersField = { + "role": "ROUTE", + "addresses": routers + }; + const readersField = { + "role": "READ", + "addresses": readers + }; + const writersField = { + "role": "WRITE", + "addresses": writers + }; + return new Record(["ttl", "servers"], [ttl, [routersField, readersField, writersField]]); + } + + function expectProtocolError(action, done) { + const promise = new Promise((resolve, reject) => { + try { + resolve(action()); + } catch (e) { + reject(e); + } + }); + + promise.catch(error => { + expect(error.code).toBe(PROTOCOL_ERROR); + done(); + }); + } + + class FakeSession { + + constructor(runResponse) { + this._runResponse = runResponse; + } + + static successful(result) { + return new FakeSession(Promise.resolve(result)); + } + + static failed(error) { + return new FakeSession(Promise.reject(error)); + } + + run() { + return this._runResponse; + } + } +}); diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js new file mode 100644 index 000000000..32927e38a --- /dev/null +++ b/test/internal/rediscovery.test.js @@ -0,0 +1,231 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Rediscovery from "../../src/v1/internal/rediscovery"; +import GetServersUtil from "../../src/v1/internal/get-servers-util"; +import {newError, PROTOCOL_ERROR} from "../../src/v1/error"; +import Record from "../../src/v1/record"; +import {int} from "../../src/v1/integer"; +import RoundRobinArray from "../../src/v1/internal/round-robin-array"; +import RoutingTable from "../../src/v1/internal/routing-table"; + +const ROUTER_ADDRESS = 'bolt+routing://test.router.com'; + +describe('rediscovery', () => { + + it('should return null when connection error happens', done => { + const util = new FakeGetServersUtil({ + callGetServers: () => null, + }); + + lookupRoutingTableOnRouter(util).then(routingTable => { + expect(routingTable).toBeNull(); + done(); + }); + }); + + it('should throw when no records are returned', done => { + const util = new FakeGetServersUtil({ + callGetServers: () => [], + }); + + lookupRoutingTableOnRouter(util).catch(error => { + expectProtocolError(error, 'Illegal response from router'); + done(); + }); + }); + + it('should throw when multiple records are returned', done => { + const util = new FakeGetServersUtil({ + callGetServers: () => [new Record(['a'], ['aaa']), new Record(['b'], ['bbb'])] + }); + + lookupRoutingTableOnRouter(util).catch(error => { + expectProtocolError(error, 'Illegal response from router'); + done(); + }); + }); + + it('should throw when ttl parsing throws', done => { + const util = new FakeGetServersUtil({ + callGetServers: () => [new Record(['a'], ['aaa'])], + parseTtl: () => { + throw newError('Unable to parse TTL', PROTOCOL_ERROR); + } + }); + + lookupRoutingTableOnRouter(util).catch(error => { + expectProtocolError(error, 'Unable to parse TTL'); + done(); + }); + }); + + it('should throw when servers parsing throws', done => { + const util = new FakeGetServersUtil({ + callGetServers: () => [new Record(['a'], ['aaa'])], + parseTtl: () => int(42), + parseServers: () => { + throw newError('Unable to parse servers', PROTOCOL_ERROR); + } + }); + + lookupRoutingTableOnRouter(util).catch(error => { + expectProtocolError(error, 'Unable to parse servers'); + done(); + }); + }); + + it('should throw when no routers', done => { + const util = new FakeGetServersUtil({ + callGetServers: () => [new Record(['a'], ['aaa'])], + parseTtl: () => int(42), + parseServers: () => { + return { + routers: new RoundRobinArray(), + readers: new RoundRobinArray(['reader1']), + writers: new RoundRobinArray(['writer1']) + }; + } + }); + + lookupRoutingTableOnRouter(util).catch(error => { + expectProtocolError(error, 'Received no routers'); + done(); + }); + }); + + it('should throw when no readers', done => { + const util = new FakeGetServersUtil({ + callGetServers: () => [new Record(['a'], ['aaa'])], + parseTtl: () => int(42), + parseServers: () => { + return { + routers: new RoundRobinArray(['router1']), + readers: new RoundRobinArray(), + writers: new RoundRobinArray(['writer1']) + }; + } + }); + + lookupRoutingTableOnRouter(util).catch(error => { + expectProtocolError(error, 'Received no readers'); + done(); + }); + }); + + it('should return null when no writers', done => { + const util = new FakeGetServersUtil({ + callGetServers: () => [new Record(['a'], ['aaa'])], + parseTtl: () => int(42), + parseServers: () => { + return { + routers: new RoundRobinArray(['router1']), + readers: new RoundRobinArray(['reader1']), + writers: new RoundRobinArray() + }; + } + }); + + lookupRoutingTableOnRouter(util).then(routingTable => { + expect(routingTable).toBeNull(); + done(); + }); + }); + + it('should return valid routing table with 1 router, 1 reader and 1 writer', done => { + testValidRoutingTable(['router1'], ['reader1'], ['writer1'], int(42), done); + }); + + it('should return valid routing table with 2 routers, 2 readers and 2 writers', done => { + testValidRoutingTable(['router1', 'router2'], ['reader1', 'reader2'], ['writer1', 'writer2'], int(Date.now()), done); + }); + + it('should return valid routing table with 1 router, 3 readers and 1 writer', done => { + testValidRoutingTable(['router1'], ['reader1', 'reader2', 'reader3'], ['writer1'], int(12345), done); + }); + + function testValidRoutingTable(routerAddresses, readerAddresses, writerAddresses, expires, done) { + const util = new FakeGetServersUtil({ + callGetServers: () => [new Record(['a'], ['aaa'])], + parseTtl: () => expires, + parseServers: () => { + return { + routers: new RoundRobinArray(routerAddresses), + readers: new RoundRobinArray(readerAddresses), + writers: new RoundRobinArray(writerAddresses) + }; + } + }); + + lookupRoutingTableOnRouter(util).then(routingTable => { + expect(routingTable).toBeDefined(); + expect(routingTable).not.toBeNull(); + + expect(routingTable.expirationTime).toEqual(expires); + + const allServers = routingTable.serversDiff(new RoutingTable()).sort(); + const allExpectedServers = [...routerAddresses, ...readerAddresses, ...writerAddresses].sort(); + expect(allServers).toEqual(allExpectedServers); + + done(); + }); + } + + function lookupRoutingTableOnRouter(getServersUtil) { + const rediscovery = new Rediscovery(getServersUtil); + return rediscovery.lookupRoutingTableOnRouter(null, ROUTER_ADDRESS); + } + + function expectProtocolError(error, messagePrefix) { + expect(error.code).toEqual(PROTOCOL_ERROR); + expect(error.message.indexOf(messagePrefix)).toEqual(0); + } + + function shouldNotBeCalled() { + throw new Error('Should not be called'); + } + + class FakeGetServersUtil extends GetServersUtil { + + constructor({callGetServers = shouldNotBeCalled, parseTtl = shouldNotBeCalled, parseServers = shouldNotBeCalled}) { + super(); + this._callGetServers = callGetServers; + this._parseTtl = parseTtl; + this._parseServers = parseServers; + } + + callGetServers(session, routerAddress) { + return new Promise((resolve, reject) => { + try { + resolve(this._callGetServers()); + } catch (error) { + reject(error); + } + }); + } + + parseTtl(record, routerAddress) { + return this._parseTtl(); + } + + parseServers(record, routerAddress) { + return this._parseServers(); + } + } +}); diff --git a/test/internal/round-robin-array.test.js b/test/internal/round-robin-array.test.js index 528829076..1064b47d2 100644 --- a/test/internal/round-robin-array.test.js +++ b/test/internal/round-robin-array.test.js @@ -61,20 +61,43 @@ describe('round-robin-array', () => { it('should push items', () => { const array1 = new RoundRobinArray(); - array1.pushAll([]); - expect(array1.toArray()).toEqual([]); + array1.pushAll([1]); + expect(array1.toArray()).toEqual([1]); - const array2 = new RoundRobinArray(); - array2.pushAll([1]); - expect(array2.toArray()).toEqual([1]); + const array2 = new RoundRobinArray([]); + array2.pushAll([1, 2, 3]); + expect(array2.toArray()).toEqual([1, 2, 3]); const array3 = new RoundRobinArray([1, 2, 3]); array3.pushAll([4, 5]); expect(array3.toArray()).toEqual([1, 2, 3, 4, 5]); + }); + + it('should push empty array', () => { + const emptyArray = new RoundRobinArray(); + emptyArray.pushAll([]); + expect(emptyArray.isEmpty()).toBeTruthy(); + + const nonEmptyArray = new RoundRobinArray([1, 2, 3]); + nonEmptyArray.pushAll([]); + expect(nonEmptyArray.toArray()).toEqual([1, 2, 3]); + }); + + it('should throw when trying to push illegal items', () => { + const emptyArray = new RoundRobinArray(); + const nonEmptyArray = new RoundRobinArray([1, 2, 3]); + + expect(() => emptyArray.pushAll(undefined)).toThrow(); + expect(() => nonEmptyArray.pushAll(undefined)).toThrow(); + + expect(() => emptyArray.pushAll(null)).toThrow(); + expect(() => nonEmptyArray.pushAll(null)).toThrow(); + + expect(() => emptyArray.pushAll({})).toThrow(); + expect(() => nonEmptyArray.pushAll({})).toThrow(); - const array4 = new RoundRobinArray([1, 2, 3]); - array4.pushAll([]); - expect(array4.toArray()).toEqual([1, 2, 3]); + expect(() => emptyArray.pushAll({a: 1, b: 2})).toThrow(); + expect(() => nonEmptyArray.pushAll({a: 1, b: 2})).toThrow(); }); it('should step through array', () => { diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index a1167a61d..928ebd3b8 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -139,14 +139,14 @@ describe('driver', function() { driver.session(); }); - it('should fail nicely when connecting with routing to standalone server', function(done) { + it('should fail nicely when connecting with routing to standalone server', done => { // Given driver = neo4j.driver("bolt+routing://localhost", neo4j.auth.basic("neo4j", "neo4j")); // Expect - driver.onError = function (err) { - expect(err.message).toEqual('Server could not perform routing, make sure you are connecting to a causal cluster'); - expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); + driver.onError = error => { + expect(error.message).toEqual('Server localhost could not perform routing. Make sure you are connecting to a causal cluster'); + expect(error.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); done(); }; From 85d9a8ba82cb65c5364390c7a8bca67b06933ee9 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 1 Feb 2017 23:00:01 +0100 Subject: [PATCH 4/8] Fix routing driver boltkit test And add early validation of the session mode parameter. --- src/v1/driver.js | 18 +++++++---- src/v1/index.js | 5 ++-- src/v1/routing-driver.js | 16 +++++----- test/v1/routing.driver.boltkit.it.js | 45 ++++++++++++++-------------- test/v1/session.test.js | 20 +++++++++++++ 5 files changed, 68 insertions(+), 36 deletions(-) diff --git a/src/v1/driver.js b/src/v1/driver.js index 44dca767c..858b71331 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -17,11 +17,10 @@ * limitations under the License. */ -import Session from './session'; -import Pool from './internal/pool'; -import Integer from './integer'; +import Session from "./session"; +import Pool from "./internal/pool"; import {connect} from "./internal/connector"; -import StreamObserver from './internal/stream-observer'; +import StreamObserver from "./internal/stream-observer"; import {newError, SERVICE_UNAVAILABLE} from "./error"; const READ = 'READ', WRITE = 'WRITE'; @@ -111,7 +110,8 @@ class Driver { * @return {Session} new session. */ session(mode) { - let connectionPromise = this._acquireConnection(mode); + const sessionMode = Driver._validateSessionMode(mode); + const connectionPromise = this._acquireConnection(sessionMode); connectionPromise.catch((err) => { if (this.onError && err.code === SERVICE_UNAVAILABLE) { this.onError(err); @@ -145,6 +145,14 @@ class Driver { }); } + static _validateSessionMode(rawMode) { + const mode = rawMode || WRITE; + if (mode !== READ && mode !== WRITE) { + throw newError('Illegal session mode ' + mode); + } + return mode; + } + //Extension point _acquireConnection(mode) { return Promise.resolve(this._pool.acquire(this._url)); diff --git a/src/v1/index.js b/src/v1/index.js index 07ba41bdd..bf5031e79 100644 --- a/src/v1/index.js +++ b/src/v1/index.js @@ -19,7 +19,7 @@ import {int, isInt, inSafeRange, toNumber, toString} from './integer'; import {Node, Relationship, UnboundRelationship, PathSegment, Path} from './graph-types' -import {Neo4jError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from './error'; +import {Neo4jError, SERVICE_UNAVAILABLE, SESSION_EXPIRED, PROTOCOL_ERROR} from './error'; import Result from './result'; import ResultSummary from './result-summary'; import Record from './record'; @@ -138,7 +138,8 @@ const session = { }; const error = { SERVICE_UNAVAILABLE, - SESSION_EXPIRED + SESSION_EXPIRED, + PROTOCOL_ERROR }; const integer = { toNumber, diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 2be24c1c7..97bbfe927 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -117,10 +117,13 @@ class RoutingDriver extends Driver { return refreshedTablePromise.then(newRoutingTable => { if (newRoutingTable) { - // correct routing table was fetched, we need to clean all connections that are not in the new table and - // return the new table + // valid routing table fetched, close old connections to servers not present in the new routing table const staleServers = currentRoutingTable.serversDiff(newRoutingTable); staleServers.forEach(server => this._pool.purge); + + // make this driver instance aware of the new table + this._routingTable = newRoutingTable; + return newRoutingTable } throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE); @@ -128,14 +131,13 @@ class RoutingDriver extends Driver { } _acquireConnection(mode) { - const connectionMode = mode || WRITE; return this._refreshedRoutingTable().then(routingTable => { - if (connectionMode === READ) { + if (mode === READ) { return this._acquireConnectionToServer(routingTable.readers, "read"); - } else if (connectionMode === WRITE) { + } else if (mode === WRITE) { return this._acquireConnectionToServer(routingTable.writers, "write"); } else { - return Promise.reject(connectionMode + " is not a valid option"); // todo: better message + throw newError('Illegal session mode ' + mode); } }); } @@ -149,8 +151,8 @@ class RoutingDriver extends Driver { } _forget(url) { + this._routingTable.forget(url); this._pool.purge(url); - this._routingTable.remove(url); } static _validateConfig(config) { diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index 098abb4e4..538e6bdc9 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -19,7 +19,8 @@ var neo4j = require("../../lib/v1"); var boltkit = require('./boltkit'); -describe('routing driver ', function () { + +describe('routing driver', function () { var originalTimeout; beforeAll(function(){ @@ -49,9 +50,9 @@ describe('routing driver ', function () { session.close(); // Then expect(driver._pool.has('127.0.0.1:9001')).toBeTruthy(); - expect(driver._clusterView.routers.toArray()).toEqual(["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]); - expect(driver._clusterView.readers.toArray()).toEqual(["127.0.0.1:9002", "127.0.0.1:9003"]); - expect(driver._clusterView.writers.toArray()).toEqual(["127.0.0.1:9001"]); + expect(driver._routingTable.routers.toArray()).toEqual(["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]); + expect(driver._routingTable.readers.toArray()).toEqual(["127.0.0.1:9002", "127.0.0.1:9003"]); + expect(driver._routingTable.writers.toArray()).toEqual(["127.0.0.1:9001"]); driver.close(); server.exit(function (code) { @@ -78,9 +79,9 @@ describe('routing driver ', function () { session.run("MATCH (n) RETURN n.name").then(function () { // Then - expect(driver._clusterView.routers.toArray()).toEqual(["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); - expect(driver._clusterView.readers.toArray()).toEqual(["127.0.0.1:9005", "127.0.0.1:9003"]); - expect(driver._clusterView.writers.toArray()).toEqual(["127.0.0.1:9001"]); + expect(driver._routingTable.routers.toArray()).toEqual(["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); + expect(driver._routingTable.readers.toArray()).toEqual(["127.0.0.1:9005", "127.0.0.1:9003"]); + expect(driver._routingTable.writers.toArray()).toEqual(["127.0.0.1:9001"]); driver.close(); server.exit(function (code) { @@ -108,9 +109,9 @@ describe('routing driver ', function () { onCompleted: function () { // Then - expect(driver._clusterView.routers.toArray()).toEqual(["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); - expect(driver._clusterView.readers.toArray()).toEqual(["127.0.0.1:9005", "127.0.0.1:9003"]); - expect(driver._clusterView.writers.toArray()).toEqual(["127.0.0.1:9001"]); + expect(driver._routingTable.routers.toArray()).toEqual(["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); + expect(driver._routingTable.readers.toArray()).toEqual(["127.0.0.1:9005", "127.0.0.1:9003"]); + expect(driver._routingTable.writers.toArray()).toEqual(["127.0.0.1:9001"]); driver.close(); server.exit(function (code) { @@ -137,7 +138,7 @@ describe('routing driver ', function () { // When var session = driver.session(neo4j.READ); session.run("MATCH (n) RETURN n.name").catch(function (err) { - expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); + expect(err.code).toEqual(neo4j.error.PROTOCOL_ERROR); session.close(); driver.close(); @@ -418,9 +419,9 @@ describe('routing driver ', function () { session.run("MATCH (n) RETURN n.name").then(function () { // Then - expect(driver._clusterView.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); - expect(driver._clusterView.readers.toArray()).toEqual(['127.0.0.1:9005', '127.0.0.1:9006']); - expect(driver._clusterView.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); + expect(driver._routingTable.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); + expect(driver._routingTable.readers.toArray()).toEqual(['127.0.0.1:9005', '127.0.0.1:9006']); + expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); seedServer.exit(function (code1) { readServer.exit(function (code2) { @@ -452,9 +453,9 @@ describe('routing driver ', function () { // Then expect(driver._pool.has('127.0.0.1:9001')).toBeTruthy(); expect(driver._pool.has('127.0.0.1:9005')).toBeFalsy(); - expect(driver._clusterView.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); - expect(driver._clusterView.readers.toArray()).toEqual(['127.0.0.1:9006']); - expect(driver._clusterView.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); + expect(driver._routingTable.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); + expect(driver._routingTable.readers.toArray()).toEqual(['127.0.0.1:9006']); + expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); seedServer.exit(function (code1) { readServer.exit(function (code2) { @@ -485,9 +486,9 @@ describe('routing driver ', function () { // Then expect(driver._pool.has('127.0.0.1:9001')).toBeTruthy(); expect(driver._pool.has('127.0.0.1:9005')).toBeFalsy(); - expect(driver._clusterView.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); - expect(driver._clusterView.readers.toArray()).toEqual(['127.0.0.1:9006']); - expect(driver._clusterView.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); + expect(driver._routingTable.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); + expect(driver._routingTable.readers.toArray()).toEqual(['127.0.0.1:9006']); + expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); seedServer.exit(function (code) { expect(code).toEqual(0); @@ -568,7 +569,7 @@ describe('routing driver ', function () { var session = driver.session(); session.run("CREATE ()").catch(function (err) { //the server at 9007 should have been removed - expect(driver._clusterView.writers.toArray()).toEqual(['127.0.0.1:9008']); + expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9008']); expect(err.code).toEqual(neo4j.error.SESSION_EXPIRED); session.close(); driver.close(); @@ -602,7 +603,7 @@ describe('routing driver ', function () { tx.commit().catch(function (err) { //the server at 9007 should have been removed - expect(driver._clusterView.writers.toArray()).toEqual(['127.0.0.1:9008']); + expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9008']); expect(err.code).toEqual(neo4j.error.SESSION_EXPIRED); session.close(); driver.close(); diff --git a/test/v1/session.test.js b/test/v1/session.test.js index bb194882d..94aae44c0 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -375,6 +375,26 @@ describe('session', function () { expect(() => session.beginTransaction([])).toThrowError(TypeError); expect(() => session.beginTransaction(['bookmark'])).toThrowError(TypeError); }); + + it('should allow creation of a ' + neo4j.session.READ + ' session', done => { + const readSession = driver.session(neo4j.session.READ); + readSession.run('RETURN 1').then(() => { + readSession.close(); + done(); + }); + }); + + it('should allow creation of a ' + neo4j.session.WRITE + ' session', done => { + const writeSession = driver.session(neo4j.session.WRITE); + writeSession.run('CREATE ()').then(() => { + writeSession.close(); + done(); + }); + }); + + it('should fail for illegal session mode', () => { + expect(() => driver.session('ILLEGAL_MODE')).toThrow(); + }); }); From f8ebe9726887c2f0dda3dceab46f8ce0597d2035 Mon Sep 17 00:00:00 2001 From: lutovich Date: Wed, 1 Feb 2017 23:41:00 +0100 Subject: [PATCH 5/8] Extract method that updates the routing table --- src/v1/internal/get-servers-util.js | 1 + src/v1/routing-driver.js | 64 ++++++++++++++++------------- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index 5e030170f..2be8b4d9a 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -27,6 +27,7 @@ const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; export default class GetServersUtil { callGetServers(session, routerAddress) { + // todo: session should be closed here along with the underlying connection return session.run(PROCEDURE_CALL).then(result => { return result.records; }).catch(error => { diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 97bbfe927..76432efa1 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -83,13 +83,36 @@ class RoutingDriver extends Driver { }); } - _refreshedRoutingTable() { + _acquireConnection(mode) { + return this._freshRoutingTable().then(routingTable => { + if (mode === READ) { + return this._acquireConnectionToServer(routingTable.readers, "read"); + } else if (mode === WRITE) { + return this._acquireConnectionToServer(routingTable.writers, "write"); + } else { + throw newError('Illegal session mode ' + mode); + } + }); + } + + _acquireConnectionToServer(serversRoundRobinArray, serverName) { + const address = serversRoundRobinArray.next(); + if (!address) { + return Promise.reject(newError('No ' + serverName + ' servers available', SESSION_EXPIRED)); + } + return this._pool.acquire(address); + } + + _freshRoutingTable() { const currentRoutingTable = this._routingTable; if (!currentRoutingTable.isStale()) { return Promise.resolve(currentRoutingTable); } + return this._refreshRoutingTable(currentRoutingTable); + } + _refreshRoutingTable(currentRoutingTable) { const knownRouters = currentRoutingTable.routers.toArray(); const refreshedTablePromise = knownRouters.reduce((refreshedTablePromise, currentRouter, currentIndex) => { @@ -117,44 +140,29 @@ class RoutingDriver extends Driver { return refreshedTablePromise.then(newRoutingTable => { if (newRoutingTable) { - // valid routing table fetched, close old connections to servers not present in the new routing table - const staleServers = currentRoutingTable.serversDiff(newRoutingTable); - staleServers.forEach(server => this._pool.purge); - - // make this driver instance aware of the new table - this._routingTable = newRoutingTable; - + this._updateRoutingTable(newRoutingTable); return newRoutingTable } throw newError('Could not perform discovery. No routing servers available.', SERVICE_UNAVAILABLE); }); } - _acquireConnection(mode) { - return this._refreshedRoutingTable().then(routingTable => { - if (mode === READ) { - return this._acquireConnectionToServer(routingTable.readers, "read"); - } else if (mode === WRITE) { - return this._acquireConnectionToServer(routingTable.writers, "write"); - } else { - throw newError('Illegal session mode ' + mode); - } - }); - } - - _acquireConnectionToServer(serversRoundRobinArray, serverName) { - const address = serversRoundRobinArray.next(); - if (!address) { - return Promise.reject(newError('No ' + serverName + ' servers available', SESSION_EXPIRED)); - } - return this._pool.acquire(address); - } - _forget(url) { this._routingTable.forget(url); this._pool.purge(url); } + _updateRoutingTable(newRoutingTable) { + const currentRoutingTable = this._routingTable; + + // close old connections to servers not present in the new routing table + const staleServers = currentRoutingTable.serversDiff(newRoutingTable); + staleServers.forEach(server => this._pool.purge); + + // make this driver instance aware of the new table + this._routingTable = newRoutingTable; + } + static _validateConfig(config) { if(config.trust === 'TRUST_ON_FIRST_USE') { throw newError('The chosen trust mode is not compatible with a routing driver'); From c2cfedf2f39824487840b70782435ea06f044074 Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 3 Feb 2017 00:27:55 +0100 Subject: [PATCH 6/8] Close session and connection used for rediscovery Previously session and the underlying connection were not returned to the pool after the rediscovery procedure. This happened because rediscovery session was not initalized with an appropriate callback. This commit fixes the problem by extracting the release callback code into a function that gets called for both "normal" and "rediscovery" sessions. Probably a cleaner way to fix this would be to move functionality into `Session#close()` and make connection release itself to the pool. However this resulted into lots of non-trivial changes and thus should go into a separate PR. --- src/v1/driver.js | 35 ++++++++------- src/v1/internal/get-servers-util.js | 2 +- src/v1/routing-driver.js | 4 +- test/internal/get-servers-util.test.js | 29 ++++++++++++- test/v1/routing.driver.boltkit.it.js | 60 ++++++++++++++++++++++++++ 5 files changed, 110 insertions(+), 20 deletions(-) diff --git a/src/v1/driver.js b/src/v1/driver.js index 858b71331..70f1b2cc3 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -119,30 +119,33 @@ class Driver { //we don't need to tell the driver about this error } }); - return this._createSession(connectionPromise, (cb) => { - // This gets called on Session#close(), and is where we return - // the pooled 'connection' instance. - - // We don't pool Session instances, to avoid users using the Session - // after they've called close. The `Session` object is just a thin - // wrapper around Connection anyway, so it makes little difference. - - // Queue up a 'reset', to ensure the next user gets a clean - // session to work with. + return this._createSession(connectionPromise, this._releaseConnection(connectionPromise)); + } - connectionPromise.then( (conn) => { + /** + * The returned function gets called on Session#close(), and is where we return the pooled 'connection' instance. + * We don't pool Session instances, to avoid users using the Session after they've called close. + * The `Session` object is just a thin wrapper around Connection anyway, so it makes little difference. + * @param {Promise} connectionPromise - promise resolved with the connection. + * @return {function(*=)} - function that releases the connection and then executes an optional callback. + * @protected + */ + _releaseConnection(connectionPromise) { + return userDefinedCallback => { + connectionPromise.then(conn => { + // Queue up a 'reset', to ensure the next user gets a clean session to work with. conn.reset(); conn.sync(); // Return connection to the pool conn._release(); - }).catch( () => {/*ignore errors here*/}); + }).catch(ignoredError => { + }); - // Call user callback - if (cb) { - cb(); + if (userDefinedCallback) { + userDefinedCallback(); } - }); + }; } static _validateSessionMode(rawMode) { diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index 2be8b4d9a..0591f8b29 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -27,8 +27,8 @@ const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'; export default class GetServersUtil { callGetServers(session, routerAddress) { - // todo: session should be closed here along with the underlying connection return session.run(PROCEDURE_CALL).then(result => { + session.close(); return result.records; }).catch(error => { if (error.code === PROCEDURE_NOT_FOUND_CODE) { diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 76432efa1..1cb761f0c 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -129,9 +129,9 @@ class RoutingDriver extends Driver { this._forget(previousRouter); } - // todo: properly close this connection const connection = this._pool.acquire(currentRouter); - const session = this._createSession(Promise.resolve(connection)); + const connectionPromise = Promise.resolve(connection); + const session = this._createSession(connectionPromise, this._releaseConnection(connectionPromise)); // try next router return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter); diff --git a/test/internal/get-servers-util.test.js b/test/internal/get-servers-util.test.js index 7c7836a48..95855cf5a 100644 --- a/test/internal/get-servers-util.test.js +++ b/test/internal/get-servers-util.test.js @@ -40,12 +40,30 @@ describe('get-servers-util', () => { it('should return retrieved records when query succeeds', done => { const session = FakeSession.successful({records: ['foo', 'bar', 'baz']}); - callGetServers(session, '').then(records => { + callGetServers(session).then(records => { expect(records).toEqual(['foo', 'bar', 'baz']); done(); }).catch(console.log); }); + it('should close session when query succeeds', done => { + const session = FakeSession.successful({records: ['foo', 'bar', 'baz']}); + + callGetServers(session).then(() => { + expect(session.isClosed()).toBeTruthy(); + done(); + }).catch(console.log); + }); + + it('should not close session when query fails', done => { + const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)); + + callGetServers(session).then(() => { + expect(session.isClosed()).toBeFalsy(); + done(); + }).catch(console.log); + }); + it('should return null on connection error', done => { const session = FakeSession.failed(newError('Oh no!', SESSION_EXPIRED)); @@ -229,6 +247,7 @@ describe('get-servers-util', () => { constructor(runResponse) { this._runResponse = runResponse; + this._closed = false; } static successful(result) { @@ -242,5 +261,13 @@ describe('get-servers-util', () => { run() { return this._runResponse; } + + close() { + this._closed = true; + } + + isClosed() { + return this._closed; + } } }); diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index 538e6bdc9..b638449b2 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -783,6 +783,66 @@ describe('routing driver', function () { }); }); + it('should close connection used for routing table refreshing', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + // server is both router and writer + const server = kit.start('./test/resources/boltkit/discover_new_servers.script', 9001); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + + const acquiredConnections = []; + const releasedConnections = []; + setUpPoolToMemorizeAllAcquiredAndReleasedConnections(driver, acquiredConnections, releasedConnections); + + const session = driver.session(); + session.run('MATCH (n) RETURN n.name').then(() => { + session.close(() => { + driver.close(); + server.exit(code => { + expect(code).toEqual(0); + + // two connections should have been acquired: one for rediscovery and one for the query + expect(acquiredConnections.length).toEqual(2); + // same two connections should have been released + expect(releasedConnections.length).toEqual(2); + + // verify that acquired connections are those that we released + for (let i = 0; i < acquiredConnections.length; i++) { + expect(acquiredConnections[i]).toBe(releasedConnections[i]); + } + + done(); + }); + }); + }); + }); + }); + + function setUpPoolToMemorizeAllAcquiredAndReleasedConnections(driver, acquiredConnections, releasedConnections) { + // make connection pool remember all acquired connections + const originalAcquire = driver._pool.acquire.bind(driver._pool); + const memorizingAcquire = (...args) => { + const connection = originalAcquire(...args); + acquiredConnections.push(connection); + return connection; + }; + driver._pool.acquire = memorizingAcquire; + + // make connection pool remember all released connections + const originalRelease = driver._pool._release; + const rememberingRelease = (key, resource) => { + originalRelease(key, resource); + releasedConnections.push(resource); + }; + driver._pool._release = rememberingRelease; + } + function newDriver(url) { // BoltKit currently does not support encryption, create driver with encryption turned off return neo4j.driver(url, neo4j.auth.basic("neo4j", "neo4j"), { From f368693668727d224dd17815da9ffe72be419479 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 6 Feb 2017 19:00:53 +0100 Subject: [PATCH 7/8] More fixes and boltkit tests for routing --- package.json | 2 + src/v1/internal/get-servers-util.js | 14 +- src/v1/internal/rediscovery.js | 7 +- src/v1/internal/routing-table.js | 8 + src/v1/routing-driver.js | 40 +- test/internal/rediscovery.test.js | 5 +- test/internal/routing-table.test.js | 20 + ...ript => empty_get_servers_response.script} | 0 .../multiple_records_get_servers.script | 10 + .../boltkit/no_readers_get_servers.script | 9 + .../boltkit/no_routers_get_servers.script | 9 + .../no_servers_entry_get_servers.script | 9 + .../boltkit/no_ttl_entry_get_servers.script | 9 + .../boltkit/one_of_each_template.script.mst | 13 + .../routing_table_with_zero_ttl.script | 22 ++ .../unparseable_servers_get_servers.script | 9 + .../unparseable_ttl_get_servers.script | 9 + test/v1/boltkit.js | 13 +- test/v1/routing.driver.boltkit.it.js | 374 ++++++++++++++++-- 19 files changed, 520 insertions(+), 62 deletions(-) rename test/resources/boltkit/{handle_empty_get_servers_response.script => empty_get_servers_response.script} (100%) create mode 100644 test/resources/boltkit/multiple_records_get_servers.script create mode 100644 test/resources/boltkit/no_readers_get_servers.script create mode 100644 test/resources/boltkit/no_routers_get_servers.script create mode 100644 test/resources/boltkit/no_servers_entry_get_servers.script create mode 100644 test/resources/boltkit/no_ttl_entry_get_servers.script create mode 100644 test/resources/boltkit/one_of_each_template.script.mst create mode 100644 test/resources/boltkit/routing_table_with_zero_ttl.script create mode 100644 test/resources/boltkit/unparseable_servers_get_servers.script create mode 100644 test/resources/boltkit/unparseable_ttl_get_servers.script diff --git a/package.json b/package.json index 36fa0b93d..f16ff30ac 100644 --- a/package.json +++ b/package.json @@ -52,10 +52,12 @@ "lolex": "^1.5.2", "merge-stream": "^1.0.0", "minimist": "^1.2.0", + "mustache": "^2.3.0", "phantomjs-prebuilt": "^2.1.7 ", "run-sequence": "^1.1.4", "semver": "^5.3.0", "through2": "~2.0.0", + "tmp": "0.0.31", "vinyl-buffer": "^1.0.0", "vinyl-source-stream": "^1.1.0" }, diff --git a/src/v1/internal/get-servers-util.js b/src/v1/internal/get-servers-util.js index 0591f8b29..70e4ed9ed 100644 --- a/src/v1/internal/get-servers-util.js +++ b/src/v1/internal/get-servers-util.js @@ -31,7 +31,7 @@ export default class GetServersUtil { session.close(); return result.records; }).catch(error => { - if (error.code === PROCEDURE_NOT_FOUND_CODE) { + if (this._isProcedureNotFoundError(error)) { // throw when getServers procedure not found because this is clearly a configuration issue throw newError('Server ' + routerAddress + ' could not perform routing. ' + 'Make sure you are connecting to a causal cluster', SERVICE_UNAVAILABLE); @@ -92,4 +92,16 @@ export default class GetServersUtil { PROTOCOL_ERROR); } } + + _isProcedureNotFoundError(error) { + let errorCode = error.code; + if (!errorCode) { + try { + errorCode = error.fields[0].code; + } catch (e) { + errorCode = 'UNKNOWN'; + } + } + return errorCode === PROCEDURE_NOT_FOUND_CODE; + } } diff --git a/src/v1/internal/rediscovery.js b/src/v1/internal/rediscovery.js index b4fd92933..08fb60bf0 100644 --- a/src/v1/internal/rediscovery.js +++ b/src/v1/internal/rediscovery.js @@ -47,11 +47,8 @@ export default class Rediscovery { Rediscovery._assertNonEmpty(routers, 'routers', routerAddress); Rediscovery._assertNonEmpty(readers, 'readers', routerAddress); - - if (writers.isEmpty()) { - // retrieved routing table has no writers, next router should be queried - return null; - } + // case with no writers is processed higher in the promise chain because only RoutingDriver knows + // how to deal with such table and how to treat router that returned such table return new RoutingTable(routers, readers, writers, expirationTime); }); diff --git a/src/v1/internal/routing-table.js b/src/v1/internal/routing-table.js index e834cdde2..af398a797 100644 --- a/src/v1/internal/routing-table.js +++ b/src/v1/internal/routing-table.js @@ -38,6 +38,14 @@ export default class RoutingTable { this.writers.remove(address); } + forgetRouter(address) { + this.routers.remove(address); + } + + forgetWriter(address) { + this.writers.remove(address); + } + serversDiff(otherRoutingTable) { const oldServers = new Set(this._allServers()); const newServers = otherRoutingTable._allServers(); diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 1cb761f0c..62431a220 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -70,10 +70,10 @@ class RoutingDriver extends Driver { let url = 'UNKNOWN'; if (conn) { url = conn.url; - this._routingTable.writers.remove(conn.url); + this._routingTable.forgetWriter(conn.url); } else { connectionPromise.then((conn) => { - this._routingTable.writers.remove(conn.url); + this._routingTable.forgetWriter(conn.url); }).catch(() => {/*ignore*/}); } return newError("No longer possible to write to server at " + url, SESSION_EXPIRED); @@ -118,28 +118,27 @@ class RoutingDriver extends Driver { const refreshedTablePromise = knownRouters.reduce((refreshedTablePromise, currentRouter, currentIndex) => { return refreshedTablePromise.then(newRoutingTable => { if (newRoutingTable) { - // correct routing table was fetched, just return it - return newRoutingTable - } - - // returned routing table was undefined, this means a connection error happened and we need to forget the - // previous router and try the next one - const previousRouter = knownRouters[currentIndex - 1]; - if (previousRouter) { - this._forget(previousRouter); + if (!newRoutingTable.writers.isEmpty()) { + // valid routing table was fetched - just return it, try next router otherwise + return newRoutingTable; + } + } else { + // returned routing table was undefined, this means a connection error happened and we need to forget the + // previous router and try the next one + const previousRouter = knownRouters[currentIndex - 1]; + if (previousRouter) { + currentRoutingTable.forgetRouter(previousRouter); + } } - const connection = this._pool.acquire(currentRouter); - const connectionPromise = Promise.resolve(connection); - const session = this._createSession(connectionPromise, this._releaseConnection(connectionPromise)); - // try next router + const session = this._createSessionForRediscovery(currentRouter); return this._rediscovery.lookupRoutingTableOnRouter(session, currentRouter); }) }, Promise.resolve(null)); return refreshedTablePromise.then(newRoutingTable => { - if (newRoutingTable) { + if (newRoutingTable && !newRoutingTable.writers.isEmpty()) { this._updateRoutingTable(newRoutingTable); return newRoutingTable } @@ -147,6 +146,15 @@ class RoutingDriver extends Driver { }); } + _createSessionForRediscovery(routerAddress) { + const connection = this._pool.acquire(routerAddress); + const connectionPromise = Promise.resolve(connection); + // error transformer here is a no-op unlike the one in a regular session, this is so because errors are + // handled in the rediscovery promise chain and we do not need to do this in the error transformer + const errorTransformer = error => error; + return new RoutingSession(connectionPromise, this._releaseConnection(connectionPromise), errorTransformer); + } + _forget(url) { this._routingTable.forget(url); this._pool.purge(url); diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js index 32927e38a..b23816207 100644 --- a/test/internal/rediscovery.test.js +++ b/test/internal/rediscovery.test.js @@ -129,7 +129,7 @@ describe('rediscovery', () => { }); }); - it('should return null when no writers', done => { + it('should return routing table when no writers', done => { const util = new FakeGetServersUtil({ callGetServers: () => [new Record(['a'], ['aaa'])], parseTtl: () => int(42), @@ -143,7 +143,8 @@ describe('rediscovery', () => { }); lookupRoutingTableOnRouter(util).then(routingTable => { - expect(routingTable).toBeNull(); + expect(routingTable).toBeDefined(); + expect(routingTable).not.toBeNull(); done(); }); }); diff --git a/test/internal/routing-table.test.js b/test/internal/routing-table.test.js index 43840859a..d306d69ff 100644 --- a/test/internal/routing-table.test.js +++ b/test/internal/routing-table.test.js @@ -87,6 +87,26 @@ describe('routing-table', () => { expect(table.writers.toArray()).toEqual([]); }); + it('should forget router', () => { + const table = createTable([1, 2], [1, 3], [4, 1], notExpired()); + + table.forgetRouter(1); + + expect(table.routers.toArray()).toEqual([2]); + expect(table.readers.toArray()).toEqual([1, 3]); + expect(table.writers.toArray()).toEqual([4, 1]); + }); + + it('should forget writer', () => { + const table = createTable([1, 2, 3], [2, 1, 5], [5, 1], notExpired()); + + table.forgetWriter(1); + + expect(table.routers.toArray()).toEqual([1, 2, 3]); + expect(table.readers.toArray()).toEqual([2, 1, 5]); + expect(table.writers.toArray()).toEqual([5]); + }); + it('should return all servers in diff when other table is empty', () => { const oldTable = createTable([1, 2], [3, 4], [5, 6], notExpired()); const newTable = createTable([], [], [], notExpired()); diff --git a/test/resources/boltkit/handle_empty_get_servers_response.script b/test/resources/boltkit/empty_get_servers_response.script similarity index 100% rename from test/resources/boltkit/handle_empty_get_servers_response.script rename to test/resources/boltkit/empty_get_servers_response.script diff --git a/test/resources/boltkit/multiple_records_get_servers.script b/test/resources/boltkit/multiple_records_get_servers.script new file mode 100644 index 000000000..2f8964ae1 --- /dev/null +++ b/test/resources/boltkit/multiple_records_get_servers.script @@ -0,0 +1,10 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "ROUTE"}]] + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9005"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]] + SUCCESS {} diff --git a/test/resources/boltkit/no_readers_get_servers.script b/test/resources/boltkit/no_readers_get_servers.script new file mode 100644 index 000000000..083caa23f --- /dev/null +++ b/test/resources/boltkit/no_readers_get_servers.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} diff --git a/test/resources/boltkit/no_routers_get_servers.script b/test/resources/boltkit/no_routers_get_servers.script new file mode 100644 index 000000000..528a57381 --- /dev/null +++ b/test/resources/boltkit/no_routers_get_servers.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"},{"addresses": ["127.0.0.1:9001"], "role": "READ"}]] + SUCCESS {} diff --git a/test/resources/boltkit/no_servers_entry_get_servers.script b/test/resources/boltkit/no_servers_entry_get_servers.script new file mode 100644 index 000000000..e2c23c13d --- /dev/null +++ b/test/resources/boltkit/no_servers_entry_get_servers.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "notServers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} diff --git a/test/resources/boltkit/no_ttl_entry_get_servers.script b/test/resources/boltkit/no_ttl_entry_get_servers.script new file mode 100644 index 000000000..cca85afab --- /dev/null +++ b/test/resources/boltkit/no_ttl_entry_get_servers.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["notTtl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} diff --git a/test/resources/boltkit/one_of_each_template.script.mst b/test/resources/boltkit/one_of_each_template.script.mst new file mode 100644 index 000000000..7cb26e547 --- /dev/null +++ b/test/resources/boltkit/one_of_each_template.script.mst @@ -0,0 +1,13 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": {{{writers}}},"role": "WRITE"}, {"addresses": {{{readers}}}, "role": "READ"},{"addresses": {{{routers}}}, "role": "ROUTE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n.name" {} + PULL_ALL +S: SUCCESS {"fields": ["n.name"]} + SUCCESS {} diff --git a/test/resources/boltkit/routing_table_with_zero_ttl.script b/test/resources/boltkit/routing_table_with_zero_ttl.script new file mode 100644 index 000000000..1a4f73db2 --- /dev/null +++ b/test/resources/boltkit/routing_table_with_zero_ttl.script @@ -0,0 +1,22 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [0, [{"addresses": ["127.0.0.1:9090","127.0.0.1:9091","127.0.0.1:9092","127.0.0.1:9000"],"role": "ROUTE"}, {"addresses": ["127.0.0.1:9000"], "role": "READ"},{"addresses": ["127.0.0.1:9000"], "role": "WRITE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n" {} + PULL_ALL +S: SUCCESS {"fields": ["n"]} + SUCCESS {} +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [0, [{"addresses": ["127.0.0.1:9000"],"role": "ROUTE"}, {"addresses": ["127.0.0.1:9000"], "role": "READ"},{"addresses": ["127.0.0.1:9000"], "role": "WRITE"}]] + SUCCESS {} +C: RUN "MATCH (n) RETURN n" {} + PULL_ALL +S: SUCCESS {"fields": ["n"]} + SUCCESS {} diff --git a/test/resources/boltkit/unparseable_servers_get_servers.script b/test/resources/boltkit/unparseable_servers_get_servers.script new file mode 100644 index 000000000..3bd2fa1a0 --- /dev/null +++ b/test/resources/boltkit/unparseable_servers_get_servers.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"notAddresses": ["127.0.0.1:9001"],"memberRole": "WRITER"}, {"notAddresses": ["127.0.0.1:9002","127.0.0.1:9003"], "memberRole": "READER"},{"notAddresses": ["127.0.0.1:9001","127.0.0.1:9002"], "memberRole": "ROUTER"}]] + SUCCESS {} diff --git a/test/resources/boltkit/unparseable_ttl_get_servers.script b/test/resources/boltkit/unparseable_ttl_get_servers.script new file mode 100644 index 000000000..53a2d5a2e --- /dev/null +++ b/test/resources/boltkit/unparseable_ttl_get_servers.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [{ttl: 9223372036854775807}, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002"], "role": "ROUTE"}]] + SUCCESS {} diff --git a/test/v1/boltkit.js b/test/v1/boltkit.js index 164cc0526..26f4a4dbe 100644 --- a/test/v1/boltkit.js +++ b/test/v1/boltkit.js @@ -17,12 +17,23 @@ * limitations under the License. */ -var childProcess = require("child_process"); +var childProcess = require('child_process'); +var Mustache = require('Mustache'); +var fs = require('fs'); +var tmp = require('tmp'); var BoltKit = function (verbose) { this.verbose = verbose || false; }; +BoltKit.prototype.startWithTemplate = function (scriptTemplate, parameters, port) { + var template = fs.readFileSync(scriptTemplate, 'utf-8'); + var scriptContents = Mustache.render(template, parameters); + var script = tmp.fileSync().name; + fs.writeFileSync(script, scriptContents, 'utf-8'); + return this.start(script, port); +}; + BoltKit.prototype.start = function(script, port) { var spawn = childProcess.spawn, server, code = -1; diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index b638449b2..44a3e6bb8 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -17,18 +17,19 @@ * limitations under the License. */ -var neo4j = require("../../lib/v1"); -var boltkit = require('./boltkit'); +import neo4j from "../../src/v1"; +import boltkit from "./boltkit"; +import RoutingTable from "../../src/v1/internal/routing-table"; describe('routing driver', function () { - var originalTimeout; + var originalTimeout; - beforeAll(function(){ + beforeAll(function () { originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL; jasmine.DEFAULT_TIMEOUT_INTERVAL = 10000; }); - afterAll(function(){ + afterAll(function () { jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout; }); @@ -50,9 +51,9 @@ describe('routing driver', function () { session.close(); // Then expect(driver._pool.has('127.0.0.1:9001')).toBeTruthy(); - expect(driver._routingTable.routers.toArray()).toEqual(["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]); - expect(driver._routingTable.readers.toArray()).toEqual(["127.0.0.1:9002", "127.0.0.1:9003"]); - expect(driver._routingTable.writers.toArray()).toEqual(["127.0.0.1:9001"]); + assertHasRouters(driver, ["127.0.0.1:9001", "127.0.0.1:9002", "127.0.0.1:9003"]); + assertHasReaders(driver, ["127.0.0.1:9002", "127.0.0.1:9003"]); + assertHasWriters(driver, ["127.0.0.1:9001"]); driver.close(); server.exit(function (code) { @@ -79,9 +80,9 @@ describe('routing driver', function () { session.run("MATCH (n) RETURN n.name").then(function () { // Then - expect(driver._routingTable.routers.toArray()).toEqual(["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); - expect(driver._routingTable.readers.toArray()).toEqual(["127.0.0.1:9005", "127.0.0.1:9003"]); - expect(driver._routingTable.writers.toArray()).toEqual(["127.0.0.1:9001"]); + assertHasRouters(driver, ["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); + assertHasReaders(driver, ["127.0.0.1:9005", "127.0.0.1:9003"]); + assertHasWriters(driver, ["127.0.0.1:9001"]); driver.close(); server.exit(function (code) { @@ -109,9 +110,9 @@ describe('routing driver', function () { onCompleted: function () { // Then - expect(driver._routingTable.routers.toArray()).toEqual(["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); - expect(driver._routingTable.readers.toArray()).toEqual(["127.0.0.1:9005", "127.0.0.1:9003"]); - expect(driver._routingTable.writers.toArray()).toEqual(["127.0.0.1:9001"]); + assertHasRouters(driver, ["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); + assertHasReaders(driver, ["127.0.0.1:9005", "127.0.0.1:9003"]); + assertHasWriters(driver, ["127.0.0.1:9001"]); driver.close(); server.exit(function (code) { @@ -130,7 +131,7 @@ describe('routing driver', function () { } // Given var kit = new boltkit.BoltKit(); - var server = kit.start('./test/resources/boltkit/handle_empty_get_servers_response.script', 9001); + var server = kit.start('./test/resources/boltkit/empty_get_servers_response.script', 9001); kit.run(function () { var driver = newDriver("bolt+routing://127.0.0.1:9001"); @@ -419,9 +420,9 @@ describe('routing driver', function () { session.run("MATCH (n) RETURN n.name").then(function () { // Then - expect(driver._routingTable.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); - expect(driver._routingTable.readers.toArray()).toEqual(['127.0.0.1:9005', '127.0.0.1:9006']); - expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); + assertHasRouters(driver, ['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); + assertHasReaders(driver, ['127.0.0.1:9005', '127.0.0.1:9006']); + assertHasWriters(driver, ['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); seedServer.exit(function (code1) { readServer.exit(function (code2) { @@ -453,9 +454,9 @@ describe('routing driver', function () { // Then expect(driver._pool.has('127.0.0.1:9001')).toBeTruthy(); expect(driver._pool.has('127.0.0.1:9005')).toBeFalsy(); - expect(driver._routingTable.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); - expect(driver._routingTable.readers.toArray()).toEqual(['127.0.0.1:9006']); - expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); + assertHasRouters(driver, ['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); + assertHasReaders(driver, ['127.0.0.1:9006']); + assertHasWriters(driver, ['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); seedServer.exit(function (code1) { readServer.exit(function (code2) { @@ -486,9 +487,9 @@ describe('routing driver', function () { // Then expect(driver._pool.has('127.0.0.1:9001')).toBeTruthy(); expect(driver._pool.has('127.0.0.1:9005')).toBeFalsy(); - expect(driver._routingTable.routers.toArray()).toEqual(['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); - expect(driver._routingTable.readers.toArray()).toEqual(['127.0.0.1:9006']); - expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9007', '127.0.0.1:9008']); + assertHasRouters(driver, ['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); + assertHasReaders(driver, ['127.0.0.1:9006']); + assertHasWriters(driver, ['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); seedServer.exit(function (code) { expect(code).toEqual(0); @@ -528,24 +529,27 @@ describe('routing driver', function () { }); }); - it('should handle server not able to do routing', function (done) { + it('should handle server not able to do routing', done => { if (!boltkit.BoltKitSupport) { done(); return; } + // Given - var kit = new boltkit.BoltKit(); - var server = kit.start('./test/resources/boltkit/non_discovery.script', 9001); + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/non_discovery.script', 9001); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(); - session.run("MATCH (n) RETURN n.name").catch(function (err) { + const session = driver.session(); + session.run("MATCH (n) RETURN n.name").catch(err => { expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); + expect(err.message.indexOf('could not perform routing') > 0).toBeTruthy(); + assertHasRouters(driver, ['127.0.0.1:9001']); session.close(); driver.close(); - server.exit(function (code) { + server.exit(code => { expect(code).toEqual(0); done(); }); @@ -569,7 +573,7 @@ describe('routing driver', function () { var session = driver.session(); session.run("CREATE ()").catch(function (err) { //the server at 9007 should have been removed - expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9008']); + assertHasWriters(driver, ['127.0.0.1:9008']); expect(err.code).toEqual(neo4j.error.SESSION_EXPIRED); session.close(); driver.close(); @@ -603,7 +607,7 @@ describe('routing driver', function () { tx.commit().catch(function (err) { //the server at 9007 should have been removed - expect(driver._routingTable.writers.toArray()).toEqual(['127.0.0.1:9008']); + assertHasWriters(driver, ['127.0.0.1:9008']); expect(err.code).toEqual(neo4j.error.SESSION_EXPIRED); session.close(); driver.close(); @@ -635,11 +639,61 @@ describe('routing driver', function () { expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); driver.close(); seedServer.exit(function (code) { - expect(code).toEqual(0); - done(); + expect(code).toEqual(0); + done(); + }); + }); + }); + }); + + it('should try next router when no writers', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const server1 = kit.start('./test/resources/boltkit/routing_table_with_zero_ttl.script', 9000); + const server2 = kit.start('./test/resources/boltkit/no_writers.script', 9090); + const server3 = kit.start('./test/resources/boltkit/no_writers.script', 9091); + const server4 = kit.start('./test/resources/boltkit/no_writers.script', 9092); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9000'); + + const session1 = driver.session(); + session1.run('MATCH (n) RETURN n').then(result1 => { + expect(result1.summary.server.address).toEqual('127.0.0.1:9000'); + session1.close(); + + assertHasRouters(driver, ['127.0.0.1:9090', '127.0.0.1:9091', '127.0.0.1:9092', '127.0.0.1:9000']); + const memorizingRoutingTable = setUpMemorizingRoutingTable(driver); + + const session2 = driver.session(); + session2.run('MATCH (n) RETURN n').then(result2 => { + expect(result2.summary.server.address).toEqual('127.0.0.1:9000'); + session2.close(); + + memorizingRoutingTable.assertForgotRouters([]); + assertHasRouters(driver, ['127.0.0.1:9000']); + driver.close(); + + server1.exit(code1 => { + server2.exit(code2 => { + server3.exit(code3 => { + server4.exit(code4 => { + expect(code1).toEqual(0); + expect(code2).toEqual(0); + expect(code3).toEqual(0); + expect(code4).toEqual(0); + done(); + }); + }); + }); }); }); }); + }); }); it('should re-use connections', function (done) { @@ -657,7 +711,7 @@ describe('routing driver', function () { // When var session = driver.session(neo4j.session.WRITE); session.run("CREATE (n {name:'Bob'})").then(function () { - session.close(function() { + session.close(function () { var connections = Object.keys(driver._openSessions).length session = driver.session(neo4j.session.WRITE); session.run("CREATE ()").then(function () { @@ -783,6 +837,44 @@ describe('routing driver', function () { }); }); + it('should forget routers when fails to connect', done => { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/routing_table_with_zero_ttl.script', 9000); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9000'); + + const session1 = driver.session(); + session1.run('MATCH (n) RETURN n').then(result1 => { + expect(result1.summary.server.address).toEqual('127.0.0.1:9000'); + session1.close(); + + assertHasRouters(driver, ['127.0.0.1:9090', '127.0.0.1:9091', '127.0.0.1:9092', '127.0.0.1:9000']); + const memorizingRoutingTable = setUpMemorizingRoutingTable(driver); + + const session2 = driver.session(); + session2.run('MATCH (n) RETURN n').then(result2 => { + expect(result2.summary.server.address).toEqual('127.0.0.1:9000'); + session2.close(); + + memorizingRoutingTable.assertForgotRouters(['127.0.0.1:9090', '127.0.0.1:9091', '127.0.0.1:9092']); + assertHasRouters(driver, ['127.0.0.1:9000']); + driver.close(); + + server.exit(code1 => { + expect(code1).toEqual(0); + done(); + }); + }); + }); + }); + }); + it('should close connection used for routing table refreshing', done => { if (!boltkit.BoltKitSupport) { done(); @@ -824,6 +916,175 @@ describe('routing driver', function () { }); }); + it('should throw protocol error when no records', done => { + testForProtocolError('./test/resources/boltkit/empty_get_servers_response.script', done); + }); + + it('should throw protocol error when no TTL entry', done => { + testForProtocolError('./test/resources/boltkit/no_ttl_entry_get_servers.script', done); + }); + + it('should throw protocol error when no servers entry', done => { + testForProtocolError('./test/resources/boltkit/no_servers_entry_get_servers.script', done); + }); + + it('should throw protocol error when multiple records', done => { + testForProtocolError('./test/resources/boltkit/unparseable_ttl_get_servers.script', done); + }); + + it('should throw protocol error on unparsable record', done => { + testForProtocolError('./test/resources/boltkit/unparseable_servers_get_servers.script', done); + }); + + it('should throw protocol error when no routers', done => { + testForProtocolError('./test/resources/boltkit/no_routers_get_servers.script', done); + }); + + it('should throw protocol error when no readers', done => { + testForProtocolError('./test/resources/boltkit/no_readers_get_servers.script', done); + }); + + it('should accept routing table with 1 router, 1 reader and 1 writer', done => { + testRoutingTableAcceptance( + { + routers: ['127.0.0.1:9090'], + readers: ['127.0.0.1:9091'], + writers: ['127.0.0.1:9000'] + }, + 9000, done); + }); + + it('should accept routing table with 2 routers, 1 reader and 1 writer', done => { + testRoutingTableAcceptance( + { + routers: ['127.0.0.1:9090', '127.0.0.1:9091'], + readers: ['127.0.0.1:9091'], + writers: ['127.0.0.1:9000'] + }, + 9000, done); + }); + + it('should accept routing table with 1 router, 2 readers and 1 writer', done => { + testRoutingTableAcceptance( + { + routers: ['127.0.0.1:9090'], + readers: ['127.0.0.1:9091', '127.0.0.1:9092'], + writers: ['127.0.0.1:9000'] + }, + 9000, done); + }); + + it('should accept routing table with 2 routers, 2 readers and 1 writer', done => { + testRoutingTableAcceptance( + { + routers: ['127.0.0.1:9090', '127.0.0.1:9091'], + readers: ['127.0.0.1:9092', '127.0.0.1:9093'], + writers: ['127.0.0.1:9000'] + }, + 9000, done); + }); + + it('should accept routing table with 1 router, 1 reader and 2 writers', done => { + testRoutingTableAcceptance( + { + routers: ['127.0.0.1:9090'], + readers: ['127.0.0.1:9091'], + writers: ['127.0.0.1:9000', '127.0.0.1:9092'] + }, + 9000, done); + }); + + it('should accept routing table with 2 routers, 1 reader and 2 writers', done => { + testRoutingTableAcceptance( + { + routers: ['127.0.0.1:9090', '127.0.0.1:9091'], + readers: ['127.0.0.1:9092'], + writers: ['127.0.0.1:9000', '127.0.0.1:9093'] + }, + 9000, done); + }); + + it('should accept routing table with 1 router, 2 readers and 2 writers', done => { + testRoutingTableAcceptance( + { + routers: ['127.0.0.1:9090'], + readers: ['127.0.0.1:9091', '127.0.0.1:9092'], + writers: ['127.0.0.1:9000', '127.0.0.1:9093'] + }, + 9000, done); + }); + + it('should accept routing table with 2 routers, 2 readers and 2 writers', done => { + testRoutingTableAcceptance( + { + routers: ['127.0.0.1:9090', '127.0.0.1:9091'], + readers: ['127.0.0.1:9092', '127.0.0.1:9093'], + writers: ['127.0.0.1:9000', '127.0.0.1:9094'] + }, + 9000, done); + }); + + function testForProtocolError(scriptFile, done) { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const kit = new boltkit.BoltKit(); + const server = kit.start(scriptFile, 9001); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:9001'); + + const session = driver.session(); + session.run('MATCH (n) RETURN n.name').catch(error => { + expect(error.code).toEqual(neo4j.error.PROTOCOL_ERROR); + + session.close(); + driver.close(); + + server.exit(code => { + expect(code).toEqual(0); + done(); + }) + }); + }); + } + + function testRoutingTableAcceptance(clusterMembers, port, done) { + if (!boltkit.BoltKitSupport) { + done(); + return; + } + + const {routers, readers, writers} = clusterMembers; + const params = { + routers: joinStrings(routers), + readers: joinStrings(readers), + writers: joinStrings(writers) + }; + const kit = new boltkit.BoltKit(); + const server = kit.startWithTemplate('./test/resources/boltkit/one_of_each_template.script.mst', params, port); + + kit.run(() => { + const driver = newDriver('bolt+routing://127.0.0.1:' + port); + + const session = driver.session(); + session.run('MATCH (n) RETURN n.name').then(result => { + + expect(result.summary.server.address).toEqual('127.0.0.1:' + port); + + session.close(); + driver.close(); + + server.exit(code => { + expect(code).toEqual(0); + done(); + }) + }); + }); + } + function setUpPoolToMemorizeAllAcquiredAndReleasedConnections(driver, acquiredConnections, releasedConnections) { // make connection pool remember all acquired connections const originalAcquire = driver._pool.acquire.bind(driver._pool); @@ -850,4 +1111,43 @@ describe('routing driver', function () { }); } + function assertHasRouters(driver, expectedRouters) { + expect(driver._routingTable.routers.toArray()).toEqual(expectedRouters); + } + + function assertHasReaders(driver, expectedReaders) { + expect(driver._routingTable.readers.toArray()).toEqual(expectedReaders); + } + + function assertHasWriters(driver, expectedWriters) { + expect(driver._routingTable.writers.toArray()).toEqual(expectedWriters); + } + + function setUpMemorizingRoutingTable(driver) { + const memorizingRoutingTable = new MemorizingRoutingTable(driver._routingTable); + driver._routingTable = memorizingRoutingTable; + return memorizingRoutingTable; + } + + function joinStrings(array) { + return '[' + array.map(s => '"' + s + '"').join(',') + ']'; + } + + class MemorizingRoutingTable extends RoutingTable { + + constructor(initialTable) { + super(initialTable.routers, initialTable.readers, initialTable.writers, initialTable.expirationTime); + this._forgottenRouters = []; + } + + forgetRouter(address) { + super.forgetRouter(address); + this._forgottenRouters.push(address); + } + + assertForgotRouters(expectedRouters) { + expect(this._forgottenRouters).toEqual(expectedRouters); + } + } + }); From 79c54f45cfa8c443c1a36a798ed67d467ceca626 Mon Sep 17 00:00:00 2001 From: lutovich Date: Mon, 6 Feb 2017 21:20:28 +0100 Subject: [PATCH 8/8] Move routing driver boltkit tests to ES6 --- test/v1/routing.driver.boltkit.it.js | 434 +++++++++++++-------------- 1 file changed, 217 insertions(+), 217 deletions(-) diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index 44a3e6bb8..771cb322d 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -21,32 +21,32 @@ import neo4j from "../../src/v1"; import boltkit from "./boltkit"; import RoutingTable from "../../src/v1/internal/routing-table"; -describe('routing driver', function () { - var originalTimeout; +describe('routing driver', () => { + let originalTimeout; - beforeAll(function () { + beforeAll(() => { originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL; jasmine.DEFAULT_TIMEOUT_INTERVAL = 10000; }); - afterAll(function () { + afterAll(() => { jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout; }); - it('should discover server', function (done) { + it('should discover server', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var server = kit.start('./test/resources/boltkit/discover_servers.script', 9001); + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/discover_servers.script', 9001); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(); - session.run("MATCH (n) RETURN n.name").then(function () { + const session = driver.session(); + session.run("MATCH (n) RETURN n.name").then(() => { session.close(); // Then @@ -56,7 +56,7 @@ describe('routing driver', function () { assertHasWriters(driver, ["127.0.0.1:9001"]); driver.close(); - server.exit(function (code) { + server.exit(code => { expect(code).toEqual(0); done(); }); @@ -64,20 +64,20 @@ describe('routing driver', function () { }); }); - it('should discover new servers', function (done) { + it('should discover new servers', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var server = kit.start('./test/resources/boltkit/discover_new_servers.script', 9001); + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/discover_new_servers.script', 9001); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(); - session.run("MATCH (n) RETURN n.name").then(function () { + const session = driver.session(); + session.run("MATCH (n) RETURN n.name").then(() => { // Then assertHasRouters(driver, ["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); @@ -85,7 +85,7 @@ describe('routing driver', function () { assertHasWriters(driver, ["127.0.0.1:9001"]); driver.close(); - server.exit(function (code) { + server.exit(code => { expect(code).toEqual(0); done(); }); @@ -93,21 +93,21 @@ describe('routing driver', function () { }); }); - it('should discover new servers using subscribe', function (done) { + it('should discover new servers using subscribe', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var server = kit.start('./test/resources/boltkit/discover_new_servers.script', 9001); + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/discover_new_servers.script', 9001); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(); + const session = driver.session(); session.run("MATCH (n) RETURN n.name").subscribe({ - onCompleted: function () { + onCompleted: () => { // Then assertHasRouters(driver, ["127.0.0.1:9004", "127.0.0.1:9002", "127.0.0.1:9003"]); @@ -115,7 +115,7 @@ describe('routing driver', function () { assertHasWriters(driver, ["127.0.0.1:9001"]); driver.close(); - server.exit(function (code) { + server.exit(code => { expect(code).toEqual(0); done(); }); @@ -124,50 +124,50 @@ describe('routing driver', function () { }); }); - it('should handle empty response from server', function (done) { + it('should handle empty response from server', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var server = kit.start('./test/resources/boltkit/empty_get_servers_response.script', 9001); + const kit = new boltkit.BoltKit(); + const server = kit.start('./test/resources/boltkit/empty_get_servers_response.script', 9001); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.READ); - session.run("MATCH (n) RETURN n.name").catch(function (err) { + const session = driver.session(neo4j.READ); + session.run("MATCH (n) RETURN n.name").catch(err => { expect(err.code).toEqual(neo4j.error.PROTOCOL_ERROR); session.close(); driver.close(); - server.exit(function (code) { + server.exit(code => { expect(code).toEqual(0); done(); }); - }).catch(function (err) { + }).catch(err => { console.log(err) }); }); }); - it('should acquire read server', function (done) { + it('should acquire read server', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer = kit.start('./test/resources/boltkit/read_server.script', 9005); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer = kit.start('./test/resources/boltkit/read_server.script', 9005); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").then(function (res) { + const session = driver.session(neo4j.session.READ); + session.run("MATCH (n) RETURN n.name").then(res => { session.close(); @@ -178,8 +178,8 @@ describe('routing driver', function () { expect(res.records[1].get('n.name')).toEqual('Alice'); expect(res.records[2].get('n.name')).toEqual('Tina'); driver.close(); - seedServer.exit(function (code1) { - readServer.exit(function (code2) { + seedServer.exit(code1 => { + readServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -189,41 +189,41 @@ describe('routing driver', function () { }); }); - it('should pick first available route-server', function (done) { + it('should pick first available route-server', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/short_ttl.script', 9000); - var nextRouter = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9003); - var readServer1 = kit.start('./test/resources/boltkit/read_server.script', 9004); - var readServer2 = kit.start('./test/resources/boltkit/read_server.script', 9005); - - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9000"); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/short_ttl.script', 9000); + const nextRouter = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9003); + const readServer1 = kit.start('./test/resources/boltkit/read_server.script', 9004); + const readServer2 = kit.start('./test/resources/boltkit/read_server.script', 9005); + + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9000"); // When - var session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").then(function (res) { + const session1 = driver.session(neo4j.session.READ); + session1.run("MATCH (n) RETURN n.name").then(res => { // Then expect(res.records[0].get('n.name')).toEqual('Bob'); expect(res.records[1].get('n.name')).toEqual('Alice'); expect(res.records[2].get('n.name')).toEqual('Tina'); - session.close(); + session1.close(); - session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").then(function (res) { + const session2 = driver.session(neo4j.session.READ); + session2.run("MATCH (n) RETURN n.name").then(res => { // Then expect(res.records[0].get('n.name')).toEqual('Bob'); expect(res.records[1].get('n.name')).toEqual('Alice'); expect(res.records[2].get('n.name')).toEqual('Tina'); - session.close(); + session2.close(); driver.close(); - seedServer.exit(function (code1) { - nextRouter.exit(function (code2) { - readServer1.exit(function (code3) { - readServer2.exit(function (code4) { + seedServer.exit(code1 => { + nextRouter.exit(code2 => { + readServer1.exit(code3 => { + readServer2.exit(code4 => { expect(code1).toEqual(0); expect(code2).toEqual(0); expect(code3).toEqual(0); @@ -238,39 +238,39 @@ describe('routing driver', function () { }); }); - it('should round-robin among read servers', function (done) { + it('should round-robin among read servers', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer1 = kit.start('./test/resources/boltkit/read_server.script', 9005); - var readServer2 = kit.start('./test/resources/boltkit/read_server.script', 9006); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer1 = kit.start('./test/resources/boltkit/read_server.script', 9005); + const readServer2 = kit.start('./test/resources/boltkit/read_server.script', 9006); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").then(function (res) { + const session1 = driver.session(neo4j.session.READ); + session1.run("MATCH (n) RETURN n.name").then(res => { // Then expect(res.records[0].get('n.name')).toEqual('Bob'); expect(res.records[1].get('n.name')).toEqual('Alice'); expect(res.records[2].get('n.name')).toEqual('Tina'); - session.close(); - session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").then(function (res) { + session1.close(); + const session2 = driver.session(neo4j.session.READ); + session2.run("MATCH (n) RETURN n.name").then(res => { // Then expect(res.records[0].get('n.name')).toEqual('Bob'); expect(res.records[1].get('n.name')).toEqual('Alice'); expect(res.records[2].get('n.name')).toEqual('Tina'); - session.close(); + session2.close(); driver.close(); - seedServer.exit(function (code1) { - readServer1.exit(function (code2) { - readServer2.exit(function (code3) { + seedServer.exit(code1 => { + readServer1.exit(code2 => { + readServer2.exit(code3 => { expect(code1).toEqual(0); expect(code2).toEqual(0); expect(code3).toEqual(0); @@ -283,25 +283,25 @@ describe('routing driver', function () { }); }); - it('should handle missing read server', function (done) { + it('should handle missing read server', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer = kit.start('./test/resources/boltkit/dead_server.script', 9005); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9005); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").catch(function (err) { + const session = driver.session(neo4j.session.READ); + session.run("MATCH (n) RETURN n.name").catch(err => { expect(err.code).toEqual(neo4j.error.SESSION_EXPIRED); driver.close(); - seedServer.exit(function (code1) { - readServer.exit(function (code2) { + seedServer.exit(code1 => { + readServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -311,26 +311,26 @@ describe('routing driver', function () { }); }); - it('should acquire write server', function (done) { + it('should acquire write server', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var writeServer = kit.start('./test/resources/boltkit/write_server.script', 9007); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const writeServer = kit.start('./test/resources/boltkit/write_server.script', 9007); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.WRITE); - session.run("CREATE (n {name:'Bob'})").then(function () { + const session = driver.session(neo4j.session.WRITE); + session.run("CREATE (n {name:'Bob'})").then(() => { // Then driver.close(); - seedServer.exit(function (code1) { - writeServer.exit(function (code2) { + seedServer.exit(code1 => { + writeServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -340,29 +340,29 @@ describe('routing driver', function () { }); }); - it('should round-robin among write servers', function (done) { + it('should round-robin among write servers', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer1 = kit.start('./test/resources/boltkit/write_server.script', 9007); - var readServer2 = kit.start('./test/resources/boltkit/write_server.script', 9008); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer1 = kit.start('./test/resources/boltkit/write_server.script', 9007); + const readServer2 = kit.start('./test/resources/boltkit/write_server.script', 9008); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.WRITE); - session.run("CREATE (n {name:'Bob'})").then(function () { - session = driver.session(neo4j.session.WRITE); - session.run("CREATE (n {name:'Bob'})").then(function () { + const session1 = driver.session(neo4j.session.WRITE); + session1.run("CREATE (n {name:'Bob'})").then(() => { + const session2 = driver.session(neo4j.session.WRITE); + session2.run("CREATE (n {name:'Bob'})").then(() => { // Then driver.close(); - seedServer.exit(function (code1) { - readServer1.exit(function (code2) { - readServer2.exit(function (code3) { + seedServer.exit(code1 => { + readServer1.exit(code2 => { + readServer2.exit(code3 => { expect(code1).toEqual(0); expect(code2).toEqual(0); expect(code3).toEqual(0); @@ -375,25 +375,25 @@ describe('routing driver', function () { }); }); - it('should handle missing write server', function (done) { + it('should handle missing write server', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer = kit.start('./test/resources/boltkit/dead_server.script', 9007); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9007); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.WRITE); - session.run("MATCH (n) RETURN n.name").catch(function (err) { + const session = driver.session(neo4j.session.WRITE); + session.run("MATCH (n) RETURN n.name").catch(err => { expect(err.code).toEqual(neo4j.error.SESSION_EXPIRED); driver.close(); - seedServer.exit(function (code1) { - readServer.exit(function (code2) { + seedServer.exit(code1 => { + readServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -403,29 +403,29 @@ describe('routing driver', function () { }); }); - it('should remember endpoints', function (done) { + it('should remember endpoints', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer = kit.start('./test/resources/boltkit/read_server.script', 9005); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer = kit.start('./test/resources/boltkit/read_server.script', 9005); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").then(function () { + const session = driver.session(neo4j.session.READ); + session.run("MATCH (n) RETURN n.name").then(() => { // Then assertHasRouters(driver, ['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']); assertHasReaders(driver, ['127.0.0.1:9005', '127.0.0.1:9006']); assertHasWriters(driver, ['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); - seedServer.exit(function (code1) { - readServer.exit(function (code2) { + seedServer.exit(code1 => { + readServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -435,21 +435,21 @@ describe('routing driver', function () { }); }); - it('should forget endpoints on failure', function (done) { + it('should forget endpoints on failure', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer = kit.start('./test/resources/boltkit/dead_server.script', 9005); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer = kit.start('./test/resources/boltkit/dead_server.script', 9005); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").catch(function () { + const session = driver.session(neo4j.session.READ); + session.run("MATCH (n) RETURN n.name").catch(() => { session.close(); // Then expect(driver._pool.has('127.0.0.1:9001')).toBeTruthy(); @@ -458,8 +458,8 @@ describe('routing driver', function () { assertHasReaders(driver, ['127.0.0.1:9006']); assertHasWriters(driver, ['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); - seedServer.exit(function (code1) { - readServer.exit(function (code2) { + seedServer.exit(code1 => { + readServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -469,20 +469,20 @@ describe('routing driver', function () { }); }); - it('should forget endpoints on session acquisition failure', function (done) { + it('should forget endpoints on session acquisition failure', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").catch(function (err) { + const session = driver.session(neo4j.session.READ); + session.run("MATCH (n) RETURN n.name").catch(() => { session.close(); // Then expect(driver._pool.has('127.0.0.1:9001')).toBeTruthy(); @@ -491,7 +491,7 @@ describe('routing driver', function () { assertHasReaders(driver, ['127.0.0.1:9006']); assertHasWriters(driver, ['127.0.0.1:9007', '127.0.0.1:9008']); driver.close(); - seedServer.exit(function (code) { + seedServer.exit(code => { expect(code).toEqual(0); done(); }); @@ -499,26 +499,26 @@ describe('routing driver', function () { }); }); - it('should rediscover if necessary', function (done) { + it('should rediscover if necessary', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/rediscover.script', 9001); - var readServer = kit.start('./test/resources/boltkit/read_server.script', 9005); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/rediscover.script', 9001); + const readServer = kit.start('./test/resources/boltkit/read_server.script', 9005); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").catch(function (err) { - session = driver.session(neo4j.session.READ); - session.run("MATCH (n) RETURN n.name").then(function (res) { + const session1 = driver.session(neo4j.session.READ); + session1.run("MATCH (n) RETURN n.name").catch(() => { + const session2 = driver.session(neo4j.session.READ); + session2.run("MATCH (n) RETURN n.name").then(() => { driver.close(); - seedServer.exit(function (code1) { - readServer.exit(function (code2) { + seedServer.exit(code1 => { + readServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -557,28 +557,28 @@ describe('routing driver', function () { }); }); - it('should handle leader switch while writing', function (done) { + it('should handle leader switch while writing', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer = kit.start('./test/resources/boltkit/not_able_to_write.script', 9007); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer = kit.start('./test/resources/boltkit/not_able_to_write.script', 9007); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(); - session.run("CREATE ()").catch(function (err) { + const session = driver.session(); + session.run("CREATE ()").catch(err => { //the server at 9007 should have been removed assertHasWriters(driver, ['127.0.0.1:9008']); expect(err.code).toEqual(neo4j.error.SESSION_EXPIRED); session.close(); driver.close(); - seedServer.exit(function (code1) { - readServer.exit(function (code2) { + seedServer.exit(code1 => { + readServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -588,31 +588,31 @@ describe('routing driver', function () { }); }); - it('should handle leader switch while writing on transaction', function (done) { + it('should handle leader switch while writing on transaction', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); - var readServer = kit.start('./test/resources/boltkit/not_able_to_write_in_transaction.script', 9007); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/acquire_endpoints.script', 9001); + const readServer = kit.start('./test/resources/boltkit/not_able_to_write_in_transaction.script', 9007); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(); - var tx = session.beginTransaction(); + const session = driver.session(); + const tx = session.beginTransaction(); tx.run("CREATE ()"); - tx.commit().catch(function (err) { + tx.commit().catch(err => { //the server at 9007 should have been removed assertHasWriters(driver, ['127.0.0.1:9008']); expect(err.code).toEqual(neo4j.error.SESSION_EXPIRED); session.close(); driver.close(); - seedServer.exit(function (code1) { - readServer.exit(function (code2) { + seedServer.exit(code1 => { + readServer.exit(code2 => { expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -622,23 +622,23 @@ describe('routing driver', function () { }); }); - it('should fail if missing write server', function (done) { + it('should fail if missing write server', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/no_writers.script', 9001); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/no_writers.script', 9001); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9001"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When - var session = driver.session(neo4j.session.WRITE); - session.run("MATCH (n) RETURN n.name").catch(function (err) { + const session = driver.session(neo4j.session.WRITE); + session.run("MATCH (n) RETURN n.name").catch(err => { expect(err.code).toEqual(neo4j.error.SERVICE_UNAVAILABLE); driver.close(); - seedServer.exit(function (code) { + seedServer.exit(code => { expect(code).toEqual(0); done(); }); @@ -696,29 +696,29 @@ describe('routing driver', function () { }); }); - it('should re-use connections', function (done) { + it('should re-use connections', done => { if (!boltkit.BoltKitSupport) { done(); return; } // Given - var kit = new boltkit.BoltKit(); - var seedServer = kit.start('./test/resources/boltkit/single_write_server.script', 9002); - var writeServer = kit.start('./test/resources/boltkit/two_write_responses_server.script', 9001); + const kit = new boltkit.BoltKit(); + const seedServer = kit.start('./test/resources/boltkit/single_write_server.script', 9002); + const writeServer = kit.start('./test/resources/boltkit/two_write_responses_server.script', 9001); - kit.run(function () { - var driver = newDriver("bolt+routing://127.0.0.1:9002"); + kit.run(() => { + const driver = newDriver("bolt+routing://127.0.0.1:9002"); // When - var session = driver.session(neo4j.session.WRITE); - session.run("CREATE (n {name:'Bob'})").then(function () { - session.close(function () { - var connections = Object.keys(driver._openSessions).length - session = driver.session(neo4j.session.WRITE); - session.run("CREATE ()").then(function () { + const session1 = driver.session(neo4j.session.WRITE); + session1.run("CREATE (n {name:'Bob'})").then(() => { + session1.close(() => { + const connections = Object.keys(driver._openSessions).length; + const session2 = driver.session(neo4j.session.WRITE); + session2.run("CREATE ()").then(() => { driver.close(); - seedServer.exit(function (code1) { - writeServer.exit(function (code2) { - expect(connections).toEqual(Object.keys(driver._openSessions).length) + seedServer.exit(code1 => { + writeServer.exit(code2 => { + expect(connections).toEqual(Object.keys(driver._openSessions).length); expect(code1).toEqual(0); expect(code2).toEqual(0); done(); @@ -730,7 +730,7 @@ describe('routing driver', function () { }); }); - it('should expose server info in cluster', function (done) { + it('should expose server info in cluster', done => { if (!boltkit.BoltKitSupport) { done(); return; @@ -742,7 +742,7 @@ describe('routing driver', function () { const writeServer = kit.start('./test/resources/boltkit/write_server_with_version.script', 9007); const readServer = kit.start('./test/resources/boltkit/read_server_with_version.script', 9005); - kit.run(function () { + kit.run(() => { const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When const readSession = driver.session(neo4j.session.READ); @@ -779,7 +779,7 @@ describe('routing driver', function () { }); }); - it('should expose server info in cluster using observer', function (done) { + it('should expose server info in cluster using observer', done => { if (!boltkit.BoltKitSupport) { done(); return; @@ -791,7 +791,7 @@ describe('routing driver', function () { const writeServer = kit.start('./test/resources/boltkit/write_server_with_version.script', 9007); const readServer = kit.start('./test/resources/boltkit/read_server_with_version.script', 9005); - kit.run(function () { + kit.run(() => { const driver = newDriver("bolt+routing://127.0.0.1:9001"); // When const readSession = driver.session(neo4j.session.READ); @@ -812,9 +812,9 @@ describe('routing driver', function () { writeSession.close(); driver.close(); - routingServer.exit(function (routingServerExitCode) { - writeServer.exit(function (writeServerExitCode) { - readServer.exit(function (readServerExitCode) { + routingServer.exit(routingServerExitCode => { + writeServer.exit(writeServerExitCode => { + readServer.exit(readServerExitCode => { expect(readSummary.server.address).toBe('127.0.0.1:9005'); expect(readSummary.server.version).toBe('TheReadServerV1');