diff --git a/src/v1/driver.js b/src/v1/driver.js index 771120761..165a8ee7a 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -42,9 +42,9 @@ class Driver { * @constructor * @param {string} url * @param {string} userAgent - * @param {Object} token - * @param {Object} config - * @access private + * @param {object} token + * @param {object} config + * @protected */ constructor(url, userAgent, token = {}, config = {}) { this._url = url; diff --git a/src/v1/index.js b/src/v1/index.js index 948259443..25a36f55e 100644 --- a/src/v1/index.js +++ b/src/v1/index.js @@ -117,6 +117,12 @@ const USER_AGENT = "neo4j-javascript/" + VERSION; * // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with * // exponential backoff using initial delay of 1 second. Default value is 30000 which is 30 seconds. * maxTransactionRetryTime: 30000, + * + * // Provide an alternative load balancing strategy for the routing driver to use. + * // Driver uses "least_connected" by default. + * // Note: We are experimenting with different strategies. This could be removed in the next minor + * // version. + * loadBalancingStrategy: "least_connected" | "round_robin", * } * * @param {string} url The URL for the Neo4j database, for instance "bolt://localhost" diff --git a/src/v1/internal/connection-providers.js b/src/v1/internal/connection-providers.js index 3a57c0624..7f1c4e32a 100644 --- a/src/v1/internal/connection-providers.js +++ b/src/v1/internal/connection-providers.js @@ -25,7 +25,6 @@ import Rediscovery from './rediscovery'; import hasFeature from './features'; import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers'; import RoutingUtil from './routing-util'; -import RoundRobinLoadBalancingStrategy from './round-robin-load-balancing-strategy'; class ConnectionProvider { @@ -62,7 +61,7 @@ export class DirectConnectionProvider extends ConnectionProvider { export class LoadBalancer extends ConnectionProvider { - constructor(address, routingContext, connectionPool, driverOnErrorCallback) { + constructor(address, routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback) { super(); this._seedRouter = address; this._routingTable = new RoutingTable([this._seedRouter]); @@ -70,7 +69,7 @@ export class LoadBalancer extends ConnectionProvider { this._connectionPool = connectionPool; this._driverOnErrorCallback = driverOnErrorCallback; this._hostNameResolver = LoadBalancer._createHostNameResolver(); - this._loadBalancingStrategy = new RoundRobinLoadBalancingStrategy(); + this._loadBalancingStrategy = loadBalancingStrategy; this._useSeedRouter = false; } diff --git a/src/v1/internal/least-connected-load-balancing-strategy.js b/src/v1/internal/least-connected-load-balancing-strategy.js new file mode 100644 index 000000000..ec6b84d28 --- /dev/null +++ b/src/v1/internal/least-connected-load-balancing-strategy.js @@ -0,0 +1,85 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import RoundRobinArrayIndex from './round-robin-array-index'; +import LoadBalancingStrategy from './load-balancing-strategy'; + +export const LEAST_CONNECTED_STRATEGY_NAME = 'least_connected'; + +export default class LeastConnectedLoadBalancingStrategy extends LoadBalancingStrategy { + + /** + * @constructor + * @param {Pool} connectionPool the connection pool of this driver. + */ + constructor(connectionPool) { + super(); + this._readersIndex = new RoundRobinArrayIndex(); + this._writersIndex = new RoundRobinArrayIndex(); + this._connectionPool = connectionPool; + } + + /** + * @inheritDoc + */ + selectReader(knownReaders) { + return this._select(knownReaders, this._readersIndex); + } + + /** + * @inheritDoc + */ + selectWriter(knownWriters) { + return this._select(knownWriters, this._writersIndex); + } + + _select(addresses, roundRobinIndex) { + const length = addresses.length; + if (length === 0) { + return null; + } + + // choose start index for iteration in round-rodin fashion + const startIndex = roundRobinIndex.next(length); + let index = startIndex; + + let leastConnectedAddress = null; + let leastActiveConnections = Number.MAX_SAFE_INTEGER; + + // iterate over the array to find least connected address + do { + const address = addresses[index]; + const activeConnections = this._connectionPool.activeResourceCount(address); + + if (activeConnections < leastActiveConnections) { + leastConnectedAddress = address; + leastActiveConnections = activeConnections; + } + + // loop over to the start of the array when end is reached + if (index === length - 1) { + index = 0; + } else { + index++; + } + } + while (index !== startIndex); + + return leastConnectedAddress; + } +} diff --git a/src/v1/internal/load-balancing-strategy.js b/src/v1/internal/load-balancing-strategy.js index a04f9ea17..6d5c7eb52 100644 --- a/src/v1/internal/load-balancing-strategy.js +++ b/src/v1/internal/load-balancing-strategy.js @@ -17,7 +17,6 @@ * limitations under the License. */ - /** * A facility to select most appropriate reader or writer among the given addresses for request processing. */ diff --git a/src/v1/internal/pool.js b/src/v1/internal/pool.js index 3d484e38b..9857cb78f 100644 --- a/src/v1/internal/pool.js +++ b/src/v1/internal/pool.js @@ -36,59 +36,117 @@ class Pool { this._validate = validate; this._maxIdle = maxIdle; this._pools = {}; + this._activeResourceCounts = {}; this._release = this._release.bind(this); } + /** + * Acquire and idle resource fom the pool or create a new one. + * @param {string} key the resource key. + * @return {object} resource that is ready to use. + */ acquire(key) { - let resource; let pool = this._pools[key]; if (!pool) { pool = []; this._pools[key] = pool; } while (pool.length) { - resource = pool.pop(); + const resource = pool.pop(); if (this._validate(resource)) { + // idle resource is valid and can be acquired + resourceAcquired(key, this._activeResourceCounts); return resource; } else { this._destroy(resource); } } + // there exist no idle valid resources, create a new one for acquisition + resourceAcquired(key, this._activeResourceCounts); return this._create(key, this._release); } + /** + * Destroy all idle resources for the given key. + * @param {string} key the resource key to purge. + */ purge(key) { - let resource; - let pool = this._pools[key] || []; + const pool = this._pools[key] || []; while (pool.length) { - resource = pool.pop(); + const resource = pool.pop(); this._destroy(resource) } delete this._pools[key] } + /** + * Destroy all idle resources in this pool. + */ purgeAll() { Object.keys(this._pools).forEach(key => this.purge(key)); } + /** + * Check if this pool contains resources for the given key. + * @param {string} key the resource key to check. + * @return {boolean} true when pool contains entries for the given key, false otherwise. + */ has(key) { return (key in this._pools); } + /** + * Get count of active (checked out of the pool) resources for the given key. + * @param {string} key the resource key to check. + * @return {number} count of resources acquired by clients. + */ + activeResourceCount(key) { + return this._activeResourceCounts[key] || 0; + } + _release(key, resource) { - let pool = this._pools[key]; - if (!pool) { + const pool = this._pools[key]; + + if (pool) { + // there exist idle connections for the given key + if (pool.length >= this._maxIdle || !this._validate(resource)) { + this._destroy(resource); + } else { + pool.push(resource); + } + } else { // key has been purged, don't put it back, just destroy the resource this._destroy(resource); - return; - } - if( pool.length >= this._maxIdle || !this._validate(resource) ) { - this._destroy(resource); - } else { - pool.push(resource); } + + resourceReleased(key, this._activeResourceCounts); + } +} + +/** + * Increment active (checked out of the pool) resource counter. + * @param {string} key the resource group identifier (server address for connections). + * @param {Object.} activeResourceCounts the object holding active counts per key. + */ +function resourceAcquired(key, activeResourceCounts) { + const currentCount = activeResourceCounts[key] || 0; + activeResourceCounts[key] = currentCount + 1; +} + +/** + * Decrement active (checked out of the pool) resource counter. + * @param {string} key the resource group identifier (server address for connections). + * @param {Object.} activeResourceCounts the object holding active counts per key. + */ +function resourceReleased(key, activeResourceCounts) { + const currentCount = activeResourceCounts[key] || 0; + const nextCount = currentCount - 1; + if (nextCount > 0) { + activeResourceCounts[key] = nextCount; + } else { + delete activeResourceCounts[key]; } } diff --git a/src/v1/internal/round-robin-load-balancing-strategy.js b/src/v1/internal/round-robin-load-balancing-strategy.js index 011a4f9b4..32ac27590 100644 --- a/src/v1/internal/round-robin-load-balancing-strategy.js +++ b/src/v1/internal/round-robin-load-balancing-strategy.js @@ -16,9 +16,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + import RoundRobinArrayIndex from './round-robin-array-index'; import LoadBalancingStrategy from './load-balancing-strategy'; +export const ROUND_ROBIN_STRATEGY_NAME = 'round_robin'; + export default class RoundRobinLoadBalancingStrategy extends LoadBalancingStrategy { constructor() { diff --git a/src/v1/routing-driver.js b/src/v1/routing-driver.js index 7e5f2ff23..859f6bea9 100644 --- a/src/v1/routing-driver.js +++ b/src/v1/routing-driver.js @@ -21,6 +21,8 @@ import Session from './session'; import {Driver} from './driver'; import {newError, SESSION_EXPIRED} from './error'; import {LoadBalancer} from './internal/connection-providers'; +import LeastConnectedLoadBalancingStrategy, {LEAST_CONNECTED_STRATEGY_NAME} from './internal/least-connected-load-balancing-strategy'; +import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './internal/round-robin-load-balancing-strategy'; /** * A driver that supports routing in a core-edge cluster. @@ -34,7 +36,8 @@ class RoutingDriver extends Driver { } _createConnectionProvider(address, connectionPool, driverOnErrorCallback) { - return new LoadBalancer(address, this._routingContext, connectionPool, driverOnErrorCallback); + const loadBalancingStrategy = RoutingDriver._createLoadBalancingStrategy(this._config, connectionPool); + return new LoadBalancer(address, this._routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback); } _createSession(mode, connectionProvider, bookmark, config) { @@ -80,6 +83,23 @@ class RoutingDriver extends Driver { return error.code === 'Neo.ClientError.Cluster.NotALeader' || error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase'; } + + /** + * Create new load balancing strategy based on the config. + * @param {object} config the user provided config. + * @param {Pool} connectionPool the connection pool for this driver. + * @return {LoadBalancingStrategy} new strategy. + */ + static _createLoadBalancingStrategy(config, connectionPool) { + const configuredValue = config.loadBalancingStrategy; + if (!configuredValue || configuredValue === LEAST_CONNECTED_STRATEGY_NAME) { + return new LeastConnectedLoadBalancingStrategy(connectionPool); + } else if (configuredValue === ROUND_ROBIN_STRATEGY_NAME) { + return new RoundRobinLoadBalancingStrategy(); + } else { + throw newError('Unknown load balancing strategy: ' + configuredValue); + } + } } class RoutingSession extends Session { diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index 8fe606c6b..51d45d43f 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -23,6 +23,7 @@ import {SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error'; import RoutingTable from '../../src/v1/internal/routing-table'; import {DirectConnectionProvider, LoadBalancer} from '../../src/v1/internal/connection-providers'; import Pool from '../../src/v1/internal/pool'; +import LeastConnectedLoadBalancingStrategy from '../../src/v1/internal/least-connected-load-balancing-strategy'; const NO_OP_DRIVER_CALLBACK = () => { }; @@ -134,7 +135,9 @@ describe('LoadBalancer', () => { }); it('initializes routing table with the given router', () => { - const loadBalancer = new LoadBalancer('server-ABC', {}, newPool(), NO_OP_DRIVER_CALLBACK); + const connectionPool = newPool(); + const loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(connectionPool); + const loadBalancer = new LoadBalancer('server-ABC', {}, connectionPool, loadBalancingStrategy, NO_OP_DRIVER_CALLBACK); expectRoutingTable(loadBalancer, ['server-ABC'], @@ -1068,7 +1071,9 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved, expirationTime = Integer.MAX_VALUE, routerToRoutingTable = {}, connectionPool = null) { - const loadBalancer = new LoadBalancer(seedRouter, {}, connectionPool || newPool(), NO_OP_DRIVER_CALLBACK); + const pool = connectionPool || newPool(); + const loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(pool); + const loadBalancer = new LoadBalancer(seedRouter, {}, pool, loadBalancingStrategy, NO_OP_DRIVER_CALLBACK); loadBalancer._routingTable = new RoutingTable(routers, readers, writers, expirationTime); loadBalancer._rediscovery = new FakeRediscovery(routerToRoutingTable); loadBalancer._hostNameResolver = new FakeDnsResolver(seedRouterResolved); diff --git a/test/internal/least-connected-load-balancing-strategy.test.js b/test/internal/least-connected-load-balancing-strategy.test.js new file mode 100644 index 000000000..17cf024a4 --- /dev/null +++ b/test/internal/least-connected-load-balancing-strategy.test.js @@ -0,0 +1,135 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import LeastConnectedLoadBalancingStrategy from '../../src/v1/internal/least-connected-load-balancing-strategy'; +import Pool from '../../src/v1/internal/pool'; + +describe('LeastConnectedLoadBalancingStrategy', () => { + + it('should return null when no readers', () => { + const knownReaders = []; + const strategy = new LeastConnectedLoadBalancingStrategy(new DummyPool({})); + + expect(strategy.selectReader(knownReaders)).toBeNull(); + }); + + it('should return null when no writers', () => { + const knownWriters = []; + const strategy = new LeastConnectedLoadBalancingStrategy(new DummyPool({})); + + expect(strategy.selectWriter(knownWriters)).toBeNull(); + }); + + it('should return same reader when it is the only one available and has no connections', () => { + const knownReaders = ['reader-1']; + const strategy = new LeastConnectedLoadBalancingStrategy(new DummyPool({'reader-1': 0})); + + expect(strategy.selectReader(knownReaders)).toEqual('reader-1'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-1'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-1'); + }); + + it('should return same writer when it is the only one available and has no connections', () => { + const knownWriters = ['writer-1']; + const strategy = new LeastConnectedLoadBalancingStrategy(new DummyPool({'writer-1': 0})); + + expect(strategy.selectWriter(knownWriters)).toEqual('writer-1'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-1'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-1'); + }); + + it('should return same reader when it is the only one available and has active connections', () => { + const knownReaders = ['reader-1']; + const strategy = new LeastConnectedLoadBalancingStrategy(new DummyPool({'reader-1': 14})); + + expect(strategy.selectReader(knownReaders)).toEqual('reader-1'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-1'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-1'); + }); + + it('should return same writer when it is the only one available and has active connections', () => { + const knownWriters = ['writer-1']; + const strategy = new LeastConnectedLoadBalancingStrategy(new DummyPool({'writer-1': 3})); + + expect(strategy.selectWriter(knownWriters)).toEqual('writer-1'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-1'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-1'); + }); + + it('should return readers in round robin order when no active connections', () => { + const knownReaders = ['reader-1', 'reader-2', 'reader-3']; + const pool = new DummyPool({'reader-1': 0, 'reader-2': 0, 'reader-3': 0}); + const strategy = new LeastConnectedLoadBalancingStrategy(pool); + + expect(strategy.selectReader(knownReaders)).toEqual('reader-1'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-2'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-3'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-1'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-2'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-3'); + }); + + it('should return writers in round robin order when no active connections', () => { + const knownWriters = ['writer-1', 'writer-2', 'writer-3', 'writer-4']; + const pool = new DummyPool({'writer-1': 0, 'writer-2': 0, 'writer-3': 0, 'writer-4': 0}); + const strategy = new LeastConnectedLoadBalancingStrategy(pool); + + expect(strategy.selectWriter(knownWriters)).toEqual('writer-1'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-2'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-3'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-4'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-1'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-2'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-3'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-4'); + }); + + it('should return least connected reader', () => { + const knownReaders = ['reader-1', 'reader-2', 'reader-3']; + const pool = new DummyPool({'reader-1': 7, 'reader-2': 3, 'reader-3': 8}); + const strategy = new LeastConnectedLoadBalancingStrategy(pool); + + expect(strategy.selectReader(knownReaders)).toEqual('reader-2'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-2'); + expect(strategy.selectReader(knownReaders)).toEqual('reader-2'); + }); + + it('should return least connected writer', () => { + const knownWriters = ['writer-1', 'writer-2', 'writer-3', 'writer-4']; + const pool = new DummyPool({'writer-1': 5, 'writer-2': 4, 'writer-3': 6, 'writer-4': 2}); + const strategy = new LeastConnectedLoadBalancingStrategy(pool); + + expect(strategy.selectWriter(knownWriters)).toEqual('writer-4'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-4'); + expect(strategy.selectWriter(knownWriters)).toEqual('writer-4'); + }); + +}); + +class DummyPool extends Pool { + + constructor(activeConnections) { + super(() => 42); + this._activeConnections = activeConnections; + } + + activeResourceCount(key) { + return this._activeConnections[key]; + } +} diff --git a/test/internal/pool.test.js b/test/internal/pool.test.js index 634c8d830..04e54e010 100644 --- a/test/internal/pool.test.js +++ b/test/internal/pool.test.js @@ -255,6 +255,86 @@ describe('Pool', () => { expect(pool.has(existingKey)).toBeTruthy(); expect(pool.has(absentKey)).toBeFalsy(); }); + + it('reports zero active resources when empty', () => { + const pool = new Pool((url, release) => new Resource(url, 42, release)); + + expect(pool.activeResourceCount('bolt://localhost:1')).toEqual(0); + expect(pool.activeResourceCount('bolt://localhost:2')).toEqual(0); + expect(pool.activeResourceCount('bolt://localhost:3')).toEqual(0); + }); + + it('reports active resources', () => { + const key = 'bolt://localhost:7687'; + const pool = new Pool((url, release) => new Resource(url, 42, release)); + + expect(pool.acquire(key)).toBeDefined(); + expect(pool.acquire(key)).toBeDefined(); + expect(pool.acquire(key)).toBeDefined(); + + expect(pool.activeResourceCount(key)).toEqual(3); + }); + + it('reports active resources when they are created', () => { + const key = 'bolt://localhost:7687'; + const pool = new Pool((url, release) => new Resource(url, 42, release)); + + // three new resources are created + expect(pool.acquire(key)).toBeDefined(); + expect(pool.acquire(key)).toBeDefined(); + expect(pool.acquire(key)).toBeDefined(); + + expect(pool.activeResourceCount(key)).toEqual(3); + }); + + it('reports active resources when they are acquired', () => { + const key = 'bolt://localhost:7687'; + const pool = new Pool((url, release) => new Resource(url, 42, release)); + + // three new resources are created and returned to the pool + const r0 = pool.acquire(key); + const r1 = pool.acquire(key); + const r2 = pool.acquire(key); + r0.close(); + r1.close(); + r2.close(); + + // three idle resources are acquired from the pool + const r3 = pool.acquire(key); + const r4 = pool.acquire(key); + const r5 = pool.acquire(key); + expect(r3).toBe(r2); + expect(r4).toBe(r1); + expect(r5).toBe(r0); + + expect(pool.activeResourceCount(key)).toEqual(3); + }); + + it('does not report resources that are returned to the pool', () => { + const key = 'bolt://localhost:7687'; + const pool = new Pool((url, release) => new Resource(url, 42, release)); + + const r0 = pool.acquire(key); + const r1 = pool.acquire(key); + const r2 = pool.acquire(key); + expect(pool.activeResourceCount(key)).toEqual(3); + + r0.close(); + expect(pool.activeResourceCount(key)).toEqual(2); + + r1.close(); + expect(pool.activeResourceCount(key)).toEqual(1); + + r2.close(); + expect(pool.activeResourceCount(key)).toEqual(0); + + const r3 = pool.acquire(key); + expect(pool.activeResourceCount(key)).toEqual(1); + + r3.close(); + expect(pool.activeResourceCount(key)).toEqual(0); + }); + }); class Resource { diff --git a/test/types/v1/driver.test.ts b/test/types/v1/driver.test.ts index 1a05f15b6..c4b379760 100644 --- a/test/types/v1/driver.test.ts +++ b/test/types/v1/driver.test.ts @@ -21,6 +21,7 @@ import Driver, { AuthToken, Config, EncryptionLevel, + LoadBalancingStrategy, READ, SessionMode, TrustStrategy, @@ -54,6 +55,8 @@ const trustedCertificates: undefined | string[] = config.trustedCertificates; const knownHosts: undefined | string = config.knownHosts; const connectionPoolSize: undefined | number = config.connectionPoolSize; const maxTransactionRetryTime: undefined | number = config.maxTransactionRetryTime; +const loadBalancingStrategy1: undefined | LoadBalancingStrategy = config.loadBalancingStrategy; +const loadBalancingStrategy2: undefined | string = config.loadBalancingStrategy; const sessionMode: SessionMode = dummy; const sessionModeStr: string = sessionMode; diff --git a/test/v1/routing-driver.test.js b/test/v1/routing-driver.test.js new file mode 100644 index 000000000..02ed07919 --- /dev/null +++ b/test/v1/routing-driver.test.js @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2002-2017 "Neo Technology,"," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import RoundRobinLoadBalancingStrategy from '../../src/v1/internal/round-robin-load-balancing-strategy'; +import LeastConnectedLoadBalancingStrategy from '../../src/v1/internal/least-connected-load-balancing-strategy'; +import RoutingDriver from '../../src/v1/routing-driver'; +import Pool from '../../src/v1/internal/pool'; + +describe('RoutingDriver', () => { + + it('should create least connected when nothing configured', () => { + const strategy = createStrategy({}); + expect(strategy instanceof LeastConnectedLoadBalancingStrategy).toBeTruthy(); + }); + + it('should create least connected when it is configured', () => { + const strategy = createStrategy({loadBalancingStrategy: 'least_connected'}); + expect(strategy instanceof LeastConnectedLoadBalancingStrategy).toBeTruthy(); + }); + + it('should create round robin when it is configured', () => { + const strategy = createStrategy({loadBalancingStrategy: 'round_robin'}); + expect(strategy instanceof RoundRobinLoadBalancingStrategy).toBeTruthy(); + }); + + it('should fail when unknown strategy is configured', () => { + expect(() => createStrategy({loadBalancingStrategy: 'wrong'})).toThrow(); + }); + +}); + +function createStrategy(config) { + return RoutingDriver._createLoadBalancingStrategy(config, new Pool()); +} diff --git a/test/v1/session.test.js b/test/v1/session.test.js index daf6a0dc2..af08ef05b 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -965,6 +965,70 @@ describe('session', () => { }); }); + it('should acquire connection for transaction', done => { + expect(session.beginTransaction()).toBeDefined(); + + const otherSession1 = driver.session(); + expect(otherSession1.beginTransaction()).toBeDefined(); + + const otherSession2 = driver.session(); + expect(otherSession2.beginTransaction()).toBeDefined(); + + const otherSession3 = driver.session(); + expect(otherSession3.beginTransaction()).toBeDefined(); + + expect(numberOfAcquiredConnectionsFromPool()).toEqual(4); + + session.close(() => { + otherSession1.close(() => { + otherSession2.close(() => { + otherSession3.close(() => { + done(); + }); + }); + }); + }); + }); + + it('should acquire connection for query execution', done => { + session.run('RETURN 42 AS answer').subscribe({ + onNext: record => { + expect(record.get('answer').toInt()).toEqual(42); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(1); + }, + onCompleted: () => { + session.close(() => { + done(); + }); + }, + onError: error => { + console.log(error); + } + }); + }); + + it('should acquire separate connections for transaction and query execution in different sessions', done => { + const otherSession = driver.session(); + expect(otherSession.beginTransaction()).toBeDefined(); + + session.run('RETURN 42 AS answer').subscribe({ + onNext: record => { + expect(record.get('answer').toInt()).toEqual(42); + expect(numberOfAcquiredConnectionsFromPool()).toEqual(2); + }, + onCompleted: () => { + otherSession.close(() => { + session.close(() => { + done(); + }); + }); + }, + onError: error => { + console.log(error); + } + }); + }); + function serverIs31OrLater(done) { // lazy way of checking the version number // if server has been set we know it is at least 3.1 @@ -1043,4 +1107,9 @@ describe('session', () => { }); } + function numberOfAcquiredConnectionsFromPool() { + const pool = driver._pool; + return pool.activeResourceCount('localhost'); + } + }); diff --git a/test/v1/stress.test.js b/test/v1/stress.test.js index 993f5159f..29dddfc7e 100644 --- a/test/v1/stress.test.js +++ b/test/v1/stress.test.js @@ -21,6 +21,7 @@ import neo4j from '../../src/v1'; import {READ, WRITE} from '../../src/v1/driver'; import parallelLimit from 'async/parallelLimit'; import _ from 'lodash'; +import {ServerVersion, VERSION_3_2_0} from '../../src/v1/internal/server-version'; import sharedNeo4j from '../internal/shared-neo4j'; describe('stress tests', () => { @@ -78,8 +79,8 @@ describe('stress tests', () => { done.fail(error); } - verifyServers(context); - verifyNodeCount(context) + verifyServers(context) + .then(() => verifyNodeCount(context)) .then(() => done()) .catch(error => done.fail(error)); }); @@ -245,13 +246,98 @@ describe('stress tests', () => { function verifyServers(context) { const routing = DATABASE_URI.indexOf('bolt+routing') === 0; - const seenServers = context.seenServers(); - if (routing && seenServers.length <= 1) { - throw new Error(`Routing driver used too few servers: ${seenServers}`); - } else if (!routing && seenServers.length !== 1) { - throw new Error(`Direct driver used too many servers: ${seenServers}`); + if (routing) { + return verifyCausalClusterMembers(context); } + return verifySingleInstance(context); + } + + function verifySingleInstance(context) { + return new Promise(resolve => { + const readServerAddresses = context.readServerAddresses(); + const writeServerAddresses = context.writeServerAddresses(); + + expect(readServerAddresses.length).toEqual(1); + expect(writeServerAddresses.length).toEqual(1); + expect(readServerAddresses).toEqual(writeServerAddresses); + + const address = readServerAddresses[0]; + expect(context.readServersWithQueryCount[address]).toBeGreaterThan(1); + expect(context.writeServersWithQueryCount[address]).toBeGreaterThan(1); + + resolve(); + }); + } + + function verifyCausalClusterMembers(context) { + return fetchClusterAddresses(context).then(clusterAddresses => { + // before 3.2.0 only read replicas serve reads + const readsOnFollowersEnabled = context.serverVersion.compareTo(VERSION_3_2_0) >= 0; + + if (readsOnFollowersEnabled) { + // expect all followers to serve more than zero read queries + assertAllAddressesServedReadQueries(clusterAddresses.followers, context.readServersWithQueryCount); + } + + // expect all read replicas to serve more than zero read queries + assertAllAddressesServedReadQueries(clusterAddresses.readReplicas, context.readServersWithQueryCount); + + if (readsOnFollowersEnabled) { + // expect all followers to serve same order of magnitude read queries + assertAllAddressesServedSimilarAmountOfReadQueries(clusterAddresses.followers, context.readServersWithQueryCount); + } + + // expect all read replicas to serve same order of magnitude read queries + assertAllAddressesServedSimilarAmountOfReadQueries(clusterAddresses.readReplicas, + context.readServersWithQueryCount); + }); + } + + function fetchClusterAddresses(context) { + const session = context.driver.session(); + return session.run('CALL dbms.cluster.overview()').then(result => { + session.close(); + const records = result.records; + + const followers = addressesWithRole(records, 'FOLLOWER'); + const readReplicas = addressesWithRole(records, 'READ_REPLICA'); + + return new ClusterAddresses(followers, readReplicas); + }); + } + + function addressesWithRole(records, role) { + return _.uniq(records.filter(record => record.get('role') === role) + .map(record => record.get('addresses')[0].replace('bolt://', ''))); + } + + function assertAllAddressesServedReadQueries(addresses, readQueriesByServer) { + addresses.forEach(address => { + const queries = readQueriesByServer[address]; + expect(queries).toBeGreaterThan(0); + }); + } + + function assertAllAddressesServedSimilarAmountOfReadQueries(addresses, readQueriesByServer) { + const expectedOrderOfMagnitude = orderOfMagnitude(readQueriesByServer[addresses[0]]); + + addresses.forEach(address => { + const queries = readQueriesByServer[address]; + const currentOrderOfMagnitude = orderOfMagnitude(queries); + + expect(currentOrderOfMagnitude).not.toBeLessThan(expectedOrderOfMagnitude - 1); + expect(currentOrderOfMagnitude).not.toBeGreaterThan(expectedOrderOfMagnitude + 1); + }); + } + + function orderOfMagnitude(number) { + let result = 1; + while (number >= 10) { + number /= 10; + result++; + } + return result; } function randomParams() { @@ -310,25 +396,41 @@ describe('stress tests', () => { this.createdNodesCount = 0; this._commandIdCouter = 0; this._loggingEnabled = loggingEnabled; - this._seenServers = new Set(); + this.readServersWithQueryCount = {}; + this.writeServersWithQueryCount = {}; + this.serverVersion = null; } queryCompleted(result, accessMode, bookmark) { + const serverInfo = result.summary.server; + + if (!this.serverVersion) { + this.serverVersion = ServerVersion.fromString(serverInfo.version); + } + + const serverAddress = serverInfo.address; if (accessMode === WRITE) { this.createdNodesCount++; + this.writeServersWithQueryCount[serverAddress] = (this.writeServersWithQueryCount[serverAddress] || 0) + 1; + } else { + this.readServersWithQueryCount[serverAddress] = (this.readServersWithQueryCount[serverAddress] || 0) + 1; } + if (bookmark) { this.bookmark = bookmark; } - this._seenServers.add(result.summary.server.address); } nextCommandId() { return this._commandIdCouter++; } - seenServers() { - return Array.from(this._seenServers); + readServerAddresses() { + return Object.keys(this.readServersWithQueryCount); + } + + writeServerAddresses() { + return Object.keys(this.writeServersWithQueryCount); } log(commandId, message) { @@ -338,4 +440,12 @@ describe('stress tests', () => { } } + class ClusterAddresses { + + constructor(followers, readReplicas) { + this.followers = followers; + this.readReplicas = readReplicas; + } + } + }); diff --git a/types/v1/driver.d.ts b/types/v1/driver.d.ts index c669502f3..3bbc59fe4 100644 --- a/types/v1/driver.d.ts +++ b/types/v1/driver.d.ts @@ -36,6 +36,8 @@ declare type TrustStrategy = "TRUST_CUSTOM_CA_SIGNED_CERTIFICATES" | "TRUST_SYSTEM_CA_SIGNED_CERTIFICATES"; +declare type LoadBalancingStrategy = "least_connected" | "round_robin"; + declare interface Config { encrypted?: boolean | EncryptionLevel; trust?: TrustStrategy; @@ -43,6 +45,7 @@ declare interface Config { knownHosts?: string; connectionPoolSize?: number; maxTransactionRetryTime?: number; + loadBalancingStrategy?: LoadBalancingStrategy; } declare type SessionMode = "READ" | "WRITE"; @@ -56,6 +59,6 @@ declare interface Driver { close(): void; } -export {Driver, READ, WRITE, AuthToken, Config, EncryptionLevel, TrustStrategy, SessionMode} +export {Driver, READ, WRITE, AuthToken, Config, EncryptionLevel, TrustStrategy, LoadBalancingStrategy, SessionMode} export default Driver;