Skip to content

Commit 2e5b292

Browse files
authored
Merge pull request #268 from lutovich/1.5-check-lb-in-stress-test
Add load balancing assertion to the stress test
2 parents 963aeae + f8adad1 commit 2e5b292

16 files changed

+660
-35
lines changed

src/v1/driver.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ class Driver {
4242
* @constructor
4343
* @param {string} url
4444
* @param {string} userAgent
45-
* @param {Object} token
46-
* @param {Object} config
47-
* @access private
45+
* @param {object} token
46+
* @param {object} config
47+
* @protected
4848
*/
4949
constructor(url, userAgent, token = {}, config = {}) {
5050
this._url = url;

src/v1/index.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ const USER_AGENT = "neo4j-javascript/" + VERSION;
117117
* // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with
118118
* // exponential backoff using initial delay of 1 second. Default value is 30000 which is 30 seconds.
119119
* maxTransactionRetryTime: 30000,
120+
*
121+
* // Provide an alternative load balancing strategy for the routing driver to use.
122+
* // Driver uses "least_connected" by default.
123+
* // <b>Note:</b> We are experimenting with different strategies. This could be removed in the next minor
124+
* // version.
125+
* loadBalancingStrategy: "least_connected" | "round_robin",
120126
* }
121127
*
122128
* @param {string} url The URL for the Neo4j database, for instance "bolt://localhost"

src/v1/internal/connection-providers.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import Rediscovery from './rediscovery';
2525
import hasFeature from './features';
2626
import {DnsHostNameResolver, DummyHostNameResolver} from './host-name-resolvers';
2727
import RoutingUtil from './routing-util';
28-
import RoundRobinLoadBalancingStrategy from './round-robin-load-balancing-strategy';
2928

3029
class ConnectionProvider {
3130

@@ -62,15 +61,15 @@ export class DirectConnectionProvider extends ConnectionProvider {
6261

6362
export class LoadBalancer extends ConnectionProvider {
6463

65-
constructor(address, routingContext, connectionPool, driverOnErrorCallback) {
64+
constructor(address, routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback) {
6665
super();
6766
this._seedRouter = address;
6867
this._routingTable = new RoutingTable([this._seedRouter]);
6968
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
7069
this._connectionPool = connectionPool;
7170
this._driverOnErrorCallback = driverOnErrorCallback;
7271
this._hostNameResolver = LoadBalancer._createHostNameResolver();
73-
this._loadBalancingStrategy = new RoundRobinLoadBalancingStrategy();
72+
this._loadBalancingStrategy = loadBalancingStrategy;
7473
this._useSeedRouter = false;
7574
}
7675

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright (c) 2002-2017 "Neo Technology,","
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
import RoundRobinArrayIndex from './round-robin-array-index';
20+
import LoadBalancingStrategy from './load-balancing-strategy';
21+
22+
export const LEAST_CONNECTED_STRATEGY_NAME = 'least_connected';
23+
24+
export default class LeastConnectedLoadBalancingStrategy extends LoadBalancingStrategy {
25+
26+
/**
27+
* @constructor
28+
* @param {Pool} connectionPool the connection pool of this driver.
29+
*/
30+
constructor(connectionPool) {
31+
super();
32+
this._readersIndex = new RoundRobinArrayIndex();
33+
this._writersIndex = new RoundRobinArrayIndex();
34+
this._connectionPool = connectionPool;
35+
}
36+
37+
/**
38+
* @inheritDoc
39+
*/
40+
selectReader(knownReaders) {
41+
return this._select(knownReaders, this._readersIndex);
42+
}
43+
44+
/**
45+
* @inheritDoc
46+
*/
47+
selectWriter(knownWriters) {
48+
return this._select(knownWriters, this._writersIndex);
49+
}
50+
51+
_select(addresses, roundRobinIndex) {
52+
const length = addresses.length;
53+
if (length === 0) {
54+
return null;
55+
}
56+
57+
// choose start index for iteration in round-rodin fashion
58+
const startIndex = roundRobinIndex.next(length);
59+
let index = startIndex;
60+
61+
let leastConnectedAddress = null;
62+
let leastActiveConnections = Number.MAX_SAFE_INTEGER;
63+
64+
// iterate over the array to find least connected address
65+
do {
66+
const address = addresses[index];
67+
const activeConnections = this._connectionPool.activeResourceCount(address);
68+
69+
if (activeConnections < leastActiveConnections) {
70+
leastConnectedAddress = address;
71+
leastActiveConnections = activeConnections;
72+
}
73+
74+
// loop over to the start of the array when end is reached
75+
if (index === length - 1) {
76+
index = 0;
77+
} else {
78+
index++;
79+
}
80+
}
81+
while (index !== startIndex);
82+
83+
return leastConnectedAddress;
84+
}
85+
}

src/v1/internal/load-balancing-strategy.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
* limitations under the License.
1818
*/
1919

20-
2120
/**
2221
* A facility to select most appropriate reader or writer among the given addresses for request processing.
2322
*/

src/v1/internal/pool.js

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,59 +36,117 @@ class Pool {
3636
this._validate = validate;
3737
this._maxIdle = maxIdle;
3838
this._pools = {};
39+
this._activeResourceCounts = {};
3940
this._release = this._release.bind(this);
4041
}
4142

43+
/**
44+
* Acquire and idle resource fom the pool or create a new one.
45+
* @param {string} key the resource key.
46+
* @return {object} resource that is ready to use.
47+
*/
4248
acquire(key) {
43-
let resource;
4449
let pool = this._pools[key];
4550
if (!pool) {
4651
pool = [];
4752
this._pools[key] = pool;
4853
}
4954
while (pool.length) {
50-
resource = pool.pop();
55+
const resource = pool.pop();
5156

5257
if (this._validate(resource)) {
58+
// idle resource is valid and can be acquired
59+
resourceAcquired(key, this._activeResourceCounts);
5360
return resource;
5461
} else {
5562
this._destroy(resource);
5663
}
5764
}
5865

66+
// there exist no idle valid resources, create a new one for acquisition
67+
resourceAcquired(key, this._activeResourceCounts);
5968
return this._create(key, this._release);
6069
}
6170

71+
/**
72+
* Destroy all idle resources for the given key.
73+
* @param {string} key the resource key to purge.
74+
*/
6275
purge(key) {
63-
let resource;
64-
let pool = this._pools[key] || [];
76+
const pool = this._pools[key] || [];
6577
while (pool.length) {
66-
resource = pool.pop();
78+
const resource = pool.pop();
6779
this._destroy(resource)
6880
}
6981
delete this._pools[key]
7082
}
7183

84+
/**
85+
* Destroy all idle resources in this pool.
86+
*/
7287
purgeAll() {
7388
Object.keys(this._pools).forEach(key => this.purge(key));
7489
}
7590

91+
/**
92+
* Check if this pool contains resources for the given key.
93+
* @param {string} key the resource key to check.
94+
* @return {boolean} <code>true</code> when pool contains entries for the given key, <code>false</code> otherwise.
95+
*/
7696
has(key) {
7797
return (key in this._pools);
7898
}
7999

100+
/**
101+
* Get count of active (checked out of the pool) resources for the given key.
102+
* @param {string} key the resource key to check.
103+
* @return {number} count of resources acquired by clients.
104+
*/
105+
activeResourceCount(key) {
106+
return this._activeResourceCounts[key] || 0;
107+
}
108+
80109
_release(key, resource) {
81-
let pool = this._pools[key];
82-
if (!pool) {
110+
const pool = this._pools[key];
111+
112+
if (pool) {
113+
// there exist idle connections for the given key
114+
if (pool.length >= this._maxIdle || !this._validate(resource)) {
115+
this._destroy(resource);
116+
} else {
117+
pool.push(resource);
118+
}
119+
} else {
83120
// key has been purged, don't put it back, just destroy the resource
84121
this._destroy(resource);
85-
return;
86-
}
87-
if( pool.length >= this._maxIdle || !this._validate(resource) ) {
88-
this._destroy(resource);
89-
} else {
90-
pool.push(resource);
91122
}
123+
124+
resourceReleased(key, this._activeResourceCounts);
125+
}
126+
}
127+
128+
/**
129+
* Increment active (checked out of the pool) resource counter.
130+
* @param {string} key the resource group identifier (server address for connections).
131+
* @param {Object.<string, number>} activeResourceCounts the object holding active counts per key.
132+
*/
133+
function resourceAcquired(key, activeResourceCounts) {
134+
const currentCount = activeResourceCounts[key] || 0;
135+
activeResourceCounts[key] = currentCount + 1;
136+
}
137+
138+
/**
139+
* Decrement active (checked out of the pool) resource counter.
140+
* @param {string} key the resource group identifier (server address for connections).
141+
* @param {Object.<string, number>} activeResourceCounts the object holding active counts per key.
142+
*/
143+
function resourceReleased(key, activeResourceCounts) {
144+
const currentCount = activeResourceCounts[key] || 0;
145+
const nextCount = currentCount - 1;
146+
if (nextCount > 0) {
147+
activeResourceCounts[key] = nextCount;
148+
} else {
149+
delete activeResourceCounts[key];
92150
}
93151
}
94152

src/v1/internal/round-robin-load-balancing-strategy.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19+
1920
import RoundRobinArrayIndex from './round-robin-array-index';
2021
import LoadBalancingStrategy from './load-balancing-strategy';
2122

23+
export const ROUND_ROBIN_STRATEGY_NAME = 'round_robin';
24+
2225
export default class RoundRobinLoadBalancingStrategy extends LoadBalancingStrategy {
2326

2427
constructor() {

src/v1/routing-driver.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import Session from './session';
2121
import {Driver} from './driver';
2222
import {newError, SESSION_EXPIRED} from './error';
2323
import {LoadBalancer} from './internal/connection-providers';
24+
import LeastConnectedLoadBalancingStrategy, {LEAST_CONNECTED_STRATEGY_NAME} from './internal/least-connected-load-balancing-strategy';
25+
import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './internal/round-robin-load-balancing-strategy';
2426

2527
/**
2628
* A driver that supports routing in a core-edge cluster.
@@ -34,7 +36,8 @@ class RoutingDriver extends Driver {
3436
}
3537

3638
_createConnectionProvider(address, connectionPool, driverOnErrorCallback) {
37-
return new LoadBalancer(address, this._routingContext, connectionPool, driverOnErrorCallback);
39+
const loadBalancingStrategy = RoutingDriver._createLoadBalancingStrategy(this._config, connectionPool);
40+
return new LoadBalancer(address, this._routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback);
3841
}
3942

4043
_createSession(mode, connectionProvider, bookmark, config) {
@@ -80,6 +83,23 @@ class RoutingDriver extends Driver {
8083
return error.code === 'Neo.ClientError.Cluster.NotALeader' ||
8184
error.code === 'Neo.ClientError.General.ForbiddenOnReadOnlyDatabase';
8285
}
86+
87+
/**
88+
* Create new load balancing strategy based on the config.
89+
* @param {object} config the user provided config.
90+
* @param {Pool} connectionPool the connection pool for this driver.
91+
* @return {LoadBalancingStrategy} new strategy.
92+
*/
93+
static _createLoadBalancingStrategy(config, connectionPool) {
94+
const configuredValue = config.loadBalancingStrategy;
95+
if (!configuredValue || configuredValue === LEAST_CONNECTED_STRATEGY_NAME) {
96+
return new LeastConnectedLoadBalancingStrategy(connectionPool);
97+
} else if (configuredValue === ROUND_ROBIN_STRATEGY_NAME) {
98+
return new RoundRobinLoadBalancingStrategy();
99+
} else {
100+
throw newError('Unknown load balancing strategy: ' + configuredValue);
101+
}
102+
}
83103
}
84104

85105
class RoutingSession extends Session {

test/internal/connection-providers.test.js

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error';
2323
import RoutingTable from '../../src/v1/internal/routing-table';
2424
import {DirectConnectionProvider, LoadBalancer} from '../../src/v1/internal/connection-providers';
2525
import Pool from '../../src/v1/internal/pool';
26+
import LeastConnectedLoadBalancingStrategy from '../../src/v1/internal/least-connected-load-balancing-strategy';
2627

2728
const NO_OP_DRIVER_CALLBACK = () => {
2829
};
@@ -134,7 +135,9 @@ describe('LoadBalancer', () => {
134135
});
135136

136137
it('initializes routing table with the given router', () => {
137-
const loadBalancer = new LoadBalancer('server-ABC', {}, newPool(), NO_OP_DRIVER_CALLBACK);
138+
const connectionPool = newPool();
139+
const loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(connectionPool);
140+
const loadBalancer = new LoadBalancer('server-ABC', {}, connectionPool, loadBalancingStrategy, NO_OP_DRIVER_CALLBACK);
138141

139142
expectRoutingTable(loadBalancer,
140143
['server-ABC'],
@@ -1068,7 +1071,9 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved,
10681071
expirationTime = Integer.MAX_VALUE,
10691072
routerToRoutingTable = {},
10701073
connectionPool = null) {
1071-
const loadBalancer = new LoadBalancer(seedRouter, {}, connectionPool || newPool(), NO_OP_DRIVER_CALLBACK);
1074+
const pool = connectionPool || newPool();
1075+
const loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(pool);
1076+
const loadBalancer = new LoadBalancer(seedRouter, {}, pool, loadBalancingStrategy, NO_OP_DRIVER_CALLBACK);
10721077
loadBalancer._routingTable = new RoutingTable(routers, readers, writers, expirationTime);
10731078
loadBalancer._rediscovery = new FakeRediscovery(routerToRoutingTable);
10741079
loadBalancer._hostNameResolver = new FakeDnsResolver(seedRouterResolved);

0 commit comments

Comments
 (0)