From 2e9415dd7540f42f133a950330ae7d946ad2db72 Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 11 Aug 2017 16:49:23 +0200 Subject: [PATCH] Added ability to enforce max connection lifetime Driver keeps idle network connections in a connection pool. These connections can get invalidated while resting idle in the pool. They can be killed by network equipment like load balancer, proxy or firewall. It is also safer to refresh connections once in a while so that database can renew corresponding resources. This commit adds `maxConnectionLifetime` setting and makes driver close too old connections. Checking and closing happens during connection acquisition. Also removed manual `Date.now()` mocking in favour of existing library - Lolex. --- src/v1/driver.js | 37 +++++- src/v1/index.js | 13 ++- src/v1/internal/connector.js | 1 + test/internal/connector.test.js | 16 +++ test/internal/fake-connection.js | 17 +++ test/internal/routing-util.test.js | 15 ++- test/internal/timers-util.js | 18 --- test/internal/transaction-executor.test.js | 12 +- test/v1/driver.test.js | 127 ++++++++++++++++++++- test/v1/routing.driver.boltkit.it.js | 24 +++- types/v1/driver.d.ts | 1 + 11 files changed, 239 insertions(+), 42 deletions(-) diff --git a/src/v1/driver.js b/src/v1/driver.js index 165a8ee7a..6bbd527a4 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -47,6 +47,8 @@ class Driver { * @protected */ constructor(url, userAgent, token = {}, config = {}) { + sanitizeConfig(config); + this._url = url; this._userAgent = userAgent; this._openSessions = {}; @@ -56,7 +58,7 @@ class Driver { this._pool = new Pool( this._createConnection.bind(this), this._destroyConnection.bind(this), - Driver._validateConnection.bind(this), + this._validateConnection.bind(this), config.connectionPoolSize ); @@ -90,8 +92,20 @@ class Driver { * @return {boolean} true if the connection is open * @access private **/ - static _validateConnection(conn) { - return conn.isOpen(); + _validateConnection(conn) { + if (!conn.isOpen()) { + return false; + } + + const maxConnectionLifetime = this._config.maxConnectionLifetime; + if (maxConnectionLifetime) { + const lifetime = Date.now() - conn.creationTimestamp; + if (lifetime > maxConnectionLifetime) { + return false; + } + } + + return true; } /** @@ -213,7 +227,22 @@ class _ConnectionStreamObserver extends StreamObserver { } } - +/** + * @private + */ +function sanitizeConfig(config) { + const maxConnectionLifetime = config.maxConnectionLifetime; + if (maxConnectionLifetime) { + const sanitizedMaxConnectionLifetime = parseInt(maxConnectionLifetime, 10); + if (sanitizedMaxConnectionLifetime && sanitizedMaxConnectionLifetime > 0) { + config.maxConnectionLifetime = sanitizedMaxConnectionLifetime; + } else { + config.maxConnectionLifetime = null; + } + } else { + config.maxConnectionLifetime = null; + } +} export {Driver, READ, WRITE} diff --git a/src/v1/index.js b/src/v1/index.js index 25a36f55e..9a95c6dae 100644 --- a/src/v1/index.js +++ b/src/v1/index.js @@ -93,7 +93,7 @@ const USER_AGENT = "neo4j-javascript/" + VERSION; * // TRUST_SYSTEM_CA_SIGNED_CERTIFICATES meand that you trust whatever certificates * // are in the default certificate chain of th * trust: "TRUST_ALL_CERTIFICATES" | "TRUST_ON_FIRST_USE" | "TRUST_SIGNED_CERTIFICATES" | - * "TRUST_CUSTOM_CA_SIGNED_CERTIFICATES" | "TRUST_SYSTEM_CA_SIGNED_CERTIFICATES", + * "TRUST_CUSTOM_CA_SIGNED_CERTIFICATES" | "TRUST_SYSTEM_CA_SIGNED_CERTIFICATES", * * // List of one or more paths to trusted encryption certificates. This only * // works in the NodeJS bundle, and only matters if you use "TRUST_CUSTOM_CA_SIGNED_CERTIFICATES". @@ -112,11 +112,20 @@ const USER_AGENT = "neo4j-javascript/" + VERSION; * // Connection will be destroyed if this threshold is exceeded. * connectionPoolSize: 50, * + * // The maximum allowed lifetime for a pooled connection in milliseconds. Pooled connections older than this + * // threshold will be closed and removed from the pool. Such discarding happens during connection acquisition + * // so that new session is never backed by an old connection. Setting this option to a low value will cause + * // a high connection churn and might result in a performance hit. It is recommended to set maximum lifetime + * // to a slightly smaller value than the one configured in network equipment (load balancer, proxy, firewall, + * // etc. can also limit maximum connection lifetime). No maximum lifetime limit is imposed by default. Zero + * // and negative values result in lifetime not being checked. + * maxConnectionLifetime: 30 * 60 * 1000, // 30 minutes + * * // Specify the maximum time in milliseconds transactions are allowed to retry via * // {@link Session#readTransaction()} and {@link Session#writeTransaction()} functions. These functions * // will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient errors with * // exponential backoff using initial delay of 1 second. Default value is 30000 which is 30 seconds. - * maxTransactionRetryTime: 30000, + * maxTransactionRetryTime: 30000, // 30 seconds * * // Provide an alternative load balancing strategy for the routing driver to use. * // Driver uses "least_connected" by default. diff --git a/src/v1/internal/connector.js b/src/v1/internal/connector.js index 10755d9db..c4d528491 100644 --- a/src/v1/internal/connector.js +++ b/src/v1/internal/connector.js @@ -174,6 +174,7 @@ class Connection { */ this.url = url; this.server = {address: url}; + this.creationTimestamp = Date.now(); this._pendingObservers = []; this._currentObserver = undefined; this._ch = channel; diff --git a/test/internal/connector.test.js b/test/internal/connector.test.js index ba4889a9f..811331cbf 100644 --- a/test/internal/connector.test.js +++ b/test/internal/connector.test.js @@ -25,12 +25,19 @@ import {alloc} from '../../src/v1/internal/buf'; import {Neo4jError} from '../../src/v1/error'; import sharedNeo4j from '../internal/shared-neo4j'; import {ServerVersion} from '../../src/v1/internal/server-version'; +import lolex from 'lolex'; describe('connector', () => { + let clock; let connection; afterEach(done => { + if (clock) { + clock.uninstall(); + clock = null; + } + const usedConnection = connection; connection = null; if (usedConnection) { @@ -39,6 +46,15 @@ describe('connector', () => { done(); }); + it('should have correct creation timestamp', () => { + clock = lolex.install(); + clock.setSystemTime(424242); + + connection = connect('bolt://localhost'); + + expect(connection.creationTimestamp).toEqual(424242); + }); + it('should read/write basic messages', done => { // Given connection = connect("bolt://localhost"); diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index e8ccda737..695cc9918 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -27,6 +27,9 @@ export default class FakeConnection { constructor() { + this._open = true; + this.creationTimestamp = Date.now(); + this.resetInvoked = 0; this.resetAsyncInvoked = 0; this.syncInvoked = 0; @@ -68,6 +71,10 @@ export default class FakeConnection { return this._initializationPromise; } + isOpen() { + return this._open; + } + isReleasedOnceOnSessionClose() { return this.isReleasedOnSessionCloseTimes(1); } @@ -103,4 +110,14 @@ export default class FakeConnection { this._initializationPromise = Promise.reject(error); return this; } + + withCreationTimestamp(value) { + this.creationTimestamp = value; + return this; + } + + closed() { + this._open = false; + return this; + } }; diff --git a/test/internal/routing-util.test.js b/test/internal/routing-util.test.js index 46c745134..826e5164b 100644 --- a/test/internal/routing-util.test.js +++ b/test/internal/routing-util.test.js @@ -30,12 +30,11 @@ describe('RoutingUtil', () => { let clock; - beforeAll(() => { - clock = lolex.install(); - }); - - afterAll(() => { - clock.uninstall(); + afterEach(() => { + if (clock) { + clock.uninstall(); + clock = null; + } }); it('should return retrieved records when query succeeds', done => { @@ -141,6 +140,8 @@ describe('RoutingUtil', () => { }); it('should parse valid ttl', () => { + clock = lolex.install(); + testValidTtlParsing(100, 5); testValidTtlParsing(Date.now(), 3600); // 1 hour testValidTtlParsing(Date.now(), 86400); // 24 hours @@ -152,6 +153,7 @@ describe('RoutingUtil', () => { it('should not overflow parsing huge ttl', () => { const record = newRecord({ttl: Integer.MAX_VALUE}); + clock = lolex.install(); clock.setSystemTime(42); const expirationTime = parseTtl(record); @@ -161,6 +163,7 @@ describe('RoutingUtil', () => { it('should return valid value parsing negative ttl', () => { const record = newRecord({ttl: int(-42)}); + clock = lolex.install(); clock.setSystemTime(42); const expirationTime = parseTtl(record); diff --git a/test/internal/timers-util.js b/test/internal/timers-util.js index a3452363f..a7cebf4d1 100644 --- a/test/internal/timers-util.js +++ b/test/internal/timers-util.js @@ -66,21 +66,3 @@ class SetTimeoutMock { } export const setTimeoutMock = new SetTimeoutMock(); - -export function hijackNextDateNowCall(newValue) { - const originalDate = global.Date; - global.Date = new FakeDate(originalDate, newValue); -} - -class FakeDate { - - constructor(originalDate, nextNowValue) { - this._originalDate = originalDate; - this._nextNowValue = nextNowValue; - } - - now() { - global.Date = this._originalDate; - return this._nextNowValue; - } -} diff --git a/test/internal/transaction-executor.test.js b/test/internal/transaction-executor.test.js index 3d9aa2360..045734d5c 100644 --- a/test/internal/transaction-executor.test.js +++ b/test/internal/transaction-executor.test.js @@ -19,7 +19,8 @@ import TransactionExecutor from '../../src/v1/internal/transaction-executor'; import {newError, SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error'; -import {hijackNextDateNowCall, setTimeoutMock} from './timers-util'; +import {setTimeoutMock} from './timers-util'; +import lolex from 'lolex'; const TRANSIENT_ERROR_1 = 'Neo.TransientError.Transaction.DeadlockDetected'; const TRANSIENT_ERROR_2 = 'Neo.TransientError.Network.CommunicationError'; @@ -30,6 +31,7 @@ const OOM_ERROR = 'Neo.DatabaseError.General.OutOfMemoryError'; describe('TransactionExecutor', () => { + let clock; let fakeSetTimeout; beforeEach(() => { @@ -37,6 +39,10 @@ describe('TransactionExecutor', () => { }); afterEach(() => { + if (clock) { + clock.uninstall(); + clock = null; + } fakeSetTimeout.uninstall(); }); @@ -81,7 +87,9 @@ describe('TransactionExecutor', () => { expect(tx).toBeDefined(); workInvocationCounter++; if (workInvocationCounter === 3) { - hijackNextDateNowCall(Date.now() + 30001); // move next `Date.now()` call forward by 30 seconds + const currentTime = Date.now(); + clock = lolex.install(); + clock.setSystemTime(currentTime + 30001); // move `Date.now()` call forward by 30 seconds } return realWork(); }); diff --git a/test/v1/driver.test.js b/test/v1/driver.test.js index 7c1120c3f..fbdf65d6b 100644 --- a/test/v1/driver.test.js +++ b/test/v1/driver.test.js @@ -19,18 +19,22 @@ import neo4j from '../../src/v1'; import sharedNeo4j from '../internal/shared-neo4j'; +import FakeConnection from '../internal/fake-connection'; +import lolex from 'lolex'; describe('driver', () => { + let clock; let driver; - beforeEach(() => { - driver = null; - }); - afterEach(() => { - if(driver) { + if (clock) { + clock.uninstall(); + clock = null; + } + if (driver) { driver.close(); + driver = null; } }); @@ -195,6 +199,106 @@ describe('driver', () => { expect(() => neo4j.driver('bolt://localhost:7687/?policy=my_policy')).toThrow(); }); + it('should sanitize maxConnectionLifetime in the config', () => { + validateMaxConnectionLifetime({}, null); + validateMaxConnectionLifetime({maxConnectionLifetime: 42}, 42); + validateMaxConnectionLifetime({maxConnectionLifetime: 0}, null); + validateMaxConnectionLifetime({maxConnectionLifetime: '42'}, 42); + validateMaxConnectionLifetime({maxConnectionLifetime: '042'}, 42); + validateMaxConnectionLifetime({maxConnectionLifetime: -42}, null); + }); + + it('should treat closed connections as invalid', () => { + driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken); + + const connectionValid = driver._validateConnection(new FakeConnection().closed()); + + expect(connectionValid).toBeFalsy(); + }); + + it('should treat not old open connections as valid', () => { + driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken, {maxConnectionLifetime: 10}); + + const connection = new FakeConnection().withCreationTimestamp(12); + clock = lolex.install(); + clock.setSystemTime(20); + const connectionValid = driver._validateConnection(connection); + + expect(connectionValid).toBeTruthy(); + }); + + it('should treat old open connections as invalid', () => { + driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken, {maxConnectionLifetime: 10}); + + const connection = new FakeConnection().withCreationTimestamp(5); + clock = lolex.install(); + clock.setSystemTime(20); + const connectionValid = driver._validateConnection(connection); + + expect(connectionValid).toBeFalsy(); + }); + + it('should discard closed connections', done => { + driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken); + + const session1 = driver.session(); + session1.run('CREATE () RETURN 42').then(() => { + session1.close(); + + // one connection should be established + const connections1 = openConnectionFrom(driver); + expect(connections1.length).toEqual(1); + + // close/break existing pooled connection + connections1.forEach(connection => connection.close()); + + const session2 = driver.session(); + session2.run('RETURN 1').then(() => { + session2.close(); + + // existing connection should be disposed and new one should be created + const connections2 = openConnectionFrom(driver); + expect(connections2.length).toEqual(1); + + expect(connections1[0]).not.toEqual(connections2[0]); + + done(); + }); + }); + }); + + it('should discard old connections', done => { + const maxLifetime = 100000; + driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken, {maxConnectionLifetime: maxLifetime}); + + const session1 = driver.session(); + session1.run('CREATE () RETURN 42').then(() => { + session1.close(); + + // one connection should be established + const connections1 = openConnectionFrom(driver); + expect(connections1.length).toEqual(1); + + // make existing connection look very old by advancing the `Date.now()` value + const currentTime = Date.now(); + clock = lolex.install(); + clock.setSystemTime(currentTime + maxLifetime * 2); + + const session2 = driver.session(); + session2.run('RETURN 1').then(() => { + session2.close(); + + // old connection should be disposed and new one should be created + const connections2 = openConnectionFrom(driver); + expect(connections2.length).toEqual(1); + + expect(connections1[0]).not.toEqual(connections2[0]); + + done(); + }); + }); + }); + const exposedTypes = [ 'Node', 'Path', @@ -225,4 +329,17 @@ describe('driver', () => { return neo4j.auth.basic('neo4j', 'who would use such a password'); } + function validateMaxConnectionLifetime(config, expectedValue) { + const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken, config); + try { + expect(driver._config.maxConnectionLifetime).toEqual(expectedValue); + } finally { + driver.close(); + } + } + + function openConnectionFrom(driver) { + return Array.from(Object.values(driver._openSessions)); + } + }); diff --git a/test/v1/routing.driver.boltkit.it.js b/test/v1/routing.driver.boltkit.it.js index ffb8647f0..088d667fd 100644 --- a/test/v1/routing.driver.boltkit.it.js +++ b/test/v1/routing.driver.boltkit.it.js @@ -22,10 +22,12 @@ import {READ, WRITE} from '../../src/v1/driver'; import boltkit from './boltkit'; import RoutingTable from '../../src/v1/internal/routing-table'; import {SESSION_EXPIRED} from '../../src/v1/error'; -import {hijackNextDateNowCall} from '../internal/timers-util'; +import lolex from 'lolex'; describe('routing driver', () => { + let originalTimeout; + let clock; beforeAll(() => { originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL; @@ -1294,12 +1296,14 @@ describe('routing driver', () => { invocations++; if (invocations === 2) { // make retries stop after two invocations - moveNextDateNow30SecondsForward(); + moveTime30SecondsForward(); } return tx.run('MATCH (n) RETURN n.name'); }); resultPromise.catch(error => { + removeTimeMocking(); // uninstall lolex mocking to make test complete, boltkit uses timers + expect(error.code).toEqual(SESSION_EXPIRED); expect(invocations).toEqual(2); @@ -1340,12 +1344,14 @@ describe('routing driver', () => { invocations++; if (invocations === 2) { // make retries stop after two invocations - moveNextDateNow30SecondsForward(); + moveTime30SecondsForward(); } return tx.run('CREATE (n {name:\'Bob\'})'); }); resultPromise.catch(error => { + removeTimeMocking(); // uninstall lolex mocking to make test complete, boltkit uses timers + expect(error.code).toEqual(SESSION_EXPIRED); expect(invocations).toEqual(2); @@ -1942,9 +1948,17 @@ describe('routing driver', () => { }); } - function moveNextDateNow30SecondsForward() { + function moveTime30SecondsForward() { const currentTime = Date.now(); - hijackNextDateNowCall(currentTime + 30 * 1000 + 1); + clock = lolex.install(); + clock.setSystemTime(currentTime + 30 * 1000 + 1); + } + + function removeTimeMocking() { + if (clock) { + clock.uninstall(); + clock = null; + } } function testWriteSessionWithAccessModeAndBookmark(accessMode, bookmark, done) { diff --git a/types/v1/driver.d.ts b/types/v1/driver.d.ts index 789c60cba..5d8ef9af6 100644 --- a/types/v1/driver.d.ts +++ b/types/v1/driver.d.ts @@ -47,6 +47,7 @@ declare interface Config { connectionPoolSize?: number; maxTransactionRetryTime?: number; loadBalancingStrategy?: LoadBalancingStrategy; + maxConnectionLifetime?: number; } declare type SessionMode = "READ" | "WRITE";