From 98f3c45c5dba7a36463150ba69fbc10bf5747dbb Mon Sep 17 00:00:00 2001 From: Ali Ince Date: Mon, 30 Sep 2019 14:32:20 +0100 Subject: [PATCH 1/3] Purge old routing table entries on inactivity --- src/internal/connection-provider-routing.js | 28 +++--- src/internal/routing-table.js | 15 ++- .../connection-provider-routing.test.js | 96 ++++++++++++++++++- test/internal/routing-table.test.js | 25 ++++- 4 files changed, 147 insertions(+), 17 deletions(-) diff --git a/src/internal/connection-provider-routing.js b/src/internal/connection-provider-routing.js index 0a985fb7c..4a4c4d196 100644 --- a/src/internal/connection-provider-routing.js +++ b/src/internal/connection-provider-routing.js @@ -31,12 +31,15 @@ import ConnectionErrorHandler from './connection-error-handler' import DelegateConnection from './connection-delegate' import LeastConnectedLoadBalancingStrategy from './least-connected-load-balancing-strategy' import Bookmark from './bookmark' +import { int } from '../integer' +import { ConsoleReporter } from 'jasmine' const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized' const DATABASE_NOT_FOUND_ERROR_CODE = 'Neo.ClientError.Database.DatabaseNotFound' const SYSTEM_DB_NAME = 'system' const DEFAULT_DB_NAME = '' +const DEFAULT_ROUTING_TABLE_PURGE_DELAY = int(30000) export default class RoutingConnectionProvider extends PooledConnectionProvider { constructor ({ @@ -47,7 +50,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider config, log, userAgent, - authToken + authToken, + routingTablePurgeDelay }) { super({ id, config, log, userAgent, authToken }) @@ -61,6 +65,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._dnsResolver = new HostNameResolver() this._log = log this._useSeedRouter = true + this._routingTablePurgeDelay = routingTablePurgeDelay + ? int(routingTablePurgeDelay) + : DEFAULT_ROUTING_TABLE_PURGE_DELAY } _createConnectionErrorHandler () { @@ -71,11 +78,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider _handleUnavailability (error, address, database) { this._log.warn( - `Routing driver ${ - this._id - } will forget ${address} for database '${database}' because of an error ${ - error.code - } '${error.message}'` + `Routing driver ${this._id} will forget ${address} for database '${database}' because of an error ${error.code} '${error.message}'` ) this.forget(address, database || '') return error @@ -83,11 +86,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider _handleWriteFailure (error, address, database) { this._log.warn( - `Routing driver ${ - this._id - } will forget writer ${address} for database '${database}' because of an error ${ - error.code - } '${error.message}'` + `Routing driver ${this._id} will forget writer ${address} for database '${database}' because of an error ${error.code} '${error.message}'` ) this.forgetWriter(address, database || '') return newError( @@ -427,6 +426,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider // close old connections to servers not present in the new routing table await this._connectionPool.keepAll(newRoutingTable.allServers()) + // filter out expired to purge (expired for a pre-configured amount of time) routing table entries + Object.values(this._routingTables).forEach(value => { + if (value.isExpiredFor(this._routingTablePurgeDelay)) { + delete this._routingTables[value.database] + } + }) + // make this driver instance aware of the new table this._routingTables[newRoutingTable.database] = newRoutingTable this._log.info(`Updated routing table ${newRoutingTable}`) diff --git a/src/internal/routing-table.js b/src/internal/routing-table.js index ce12ec6f5..bc00d66e4 100644 --- a/src/internal/routing-table.js +++ b/src/internal/routing-table.js @@ -24,6 +24,7 @@ const MIN_ROUTERS = 1 export default class RoutingTable { constructor ({ database, routers, readers, writers, expirationTime } = {}) { this.database = database + this.databaseName = database || 'default database' this.routers = routers || [] this.readers = readers || [] this.writers = writers || [] @@ -61,14 +62,24 @@ export default class RoutingTable { ) } + /** + * Check if this routing table is expired for specified amount of duration + * + * @param {Integer} duration amount of duration in milliseconds to check for expiration + * @returns {boolean} + */ + isExpiredFor (duration) { + return this.expirationTime.add(duration).lessThan(Date.now()) + } + allServers () { return [...this.routers, ...this.readers, ...this.writers] } toString () { return ( - `RoutingTable[` + - `database=${this.database}, ` + + 'RoutingTable[' + + `database=${this.databaseName}, ` + `expirationTime=${this.expirationTime}, ` + `currentTime=${Date.now()}, ` + `routers=[${this.routers}], ` + diff --git a/test/internal/connection-provider-routing.test.js b/test/internal/connection-provider-routing.test.js index ca66bd712..f752dba4f 100644 --- a/test/internal/connection-provider-routing.test.js +++ b/test/internal/connection-provider-routing.test.js @@ -1723,6 +1723,92 @@ describe('#unit RoutingConnectionProvider', () => { [serverC] ) }) + + it('should purge expired routing tables after specified duration on update', async () => { + var originalDateNow = Date.now + Date.now = () => 50000 + try { + const routingTableToLoad = newRoutingTable( + 'databaseC', + [server1, server2, server3], + [server2, server3], + [server1] + ) + const connectionProvider = newRoutingConnectionProviderWithSeedRouter( + server1, + [server1], + [ + newRoutingTable( + 'databaseA', + [server1, server2, server3], + [server1, server2], + [server3], + int(Date.now() + 12000) + ), + newRoutingTable( + 'databaseB', + [server1, server2, server3], + [server1, server3], + [server2], + int(Date.now() + 2000) + ) + ], + { + databaseC: { + 'server1:7687': routingTableToLoad + } + }, + null, + 4000 + ) + + expectRoutingTable( + connectionProvider, + 'databaseA', + [server1, server2, server3], + [server1, server2], + [server3] + ) + expectRoutingTable( + connectionProvider, + 'databaseB', + [server1, server2, server3], + [server1, server3], + [server2] + ) + + // make routing table for databaseA to report true for isExpiredFor(4000) + // call. + Date.now = () => 58000 + + // force a routing table update for databaseC + const conn1 = await connectionProvider.acquireConnection({ + accessMode: WRITE, + database: 'databaseC' + }) + expect(conn1).not.toBeNull() + expect(conn1.address).toBe(server1) + + // Then + expectRoutingTable( + connectionProvider, + 'databaseA', + [server1, server2, server3], + [server1, server2], + [server3] + ) + expectRoutingTable( + connectionProvider, + 'databaseC', + [server1, server2, server3], + [server2, server3], + [server1] + ) + expectNoRoutingTable(connectionProvider, 'databaseB') + } finally { + Date.now = originalDateNow + } + }) }) }) @@ -1746,7 +1832,8 @@ function newRoutingConnectionProviderWithSeedRouter ( seedRouterResolved, routingTables, routerToRoutingTable = { '': {} }, - connectionPool = null + connectionPool = null, + routingTablePurgeDelay = null ) { const pool = connectionPool || newPool() const connectionProvider = new RoutingConnectionProvider({ @@ -1755,7 +1842,8 @@ function newRoutingConnectionProviderWithSeedRouter ( routingContext: {}, hostNameResolver: new SimpleHostNameResolver(), config: {}, - log: Logger.noOp() + log: Logger.noOp(), + routingTablePurgeDelay: routingTablePurgeDelay }) connectionProvider._connectionPool = pool routingTables.forEach(r => { @@ -1821,6 +1909,10 @@ function expectRoutingTable ( expect(connectionProvider._routingTables[database].writers).toEqual(writers) } +function expectNoRoutingTable (connectionProvider, database) { + expect(connectionProvider._routingTables[database]).toBeFalsy() +} + function expectPoolToContain (pool, addresses) { addresses.forEach(address => { expect(pool.has(address)).toBeTruthy() diff --git a/test/internal/routing-table.test.js b/test/internal/routing-table.test.js index 6ce0ff104..f89ec41d6 100644 --- a/test/internal/routing-table.test.js +++ b/test/internal/routing-table.test.js @@ -201,8 +201,29 @@ describe('#unit RoutingTable', () => { } }) - function expired () { - return Date.now() - 3600 // expired an hour ago + it('should report correct value when expired for is tested', () => { + const originalDateNow = Date.now + try { + Date.now = () => 50000 + const table = createTable( + [server1, server2, server3], + [server2, server1, server5], + [server5, server1], + expired(7200) + ) + + expect(table.isStaleFor(READ)).toBeTruthy() + expect(table.isStaleFor(WRITE)).toBeTruthy() + + expect(table.isExpiredFor(3600)).toBeTruthy() + expect(table.isExpiredFor(10800)).toBeFalsy() + } finally { + Date.now = originalDateNow + } + }) + + function expired (expiredFor) { + return Date.now() - (expiredFor || 3600) // expired an hour ago } function notExpired () { From d8539d04f5bb41d83ff2499bdf65d3c6c84d9d22 Mon Sep 17 00:00:00 2001 From: Ali Ince Date: Mon, 30 Sep 2019 18:27:29 +0100 Subject: [PATCH 2/3] Clean import --- src/internal/connection-provider-routing.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/internal/connection-provider-routing.js b/src/internal/connection-provider-routing.js index 4a4c4d196..5da894a41 100644 --- a/src/internal/connection-provider-routing.js +++ b/src/internal/connection-provider-routing.js @@ -32,7 +32,6 @@ import DelegateConnection from './connection-delegate' import LeastConnectedLoadBalancingStrategy from './least-connected-load-balancing-strategy' import Bookmark from './bookmark' import { int } from '../integer' -import { ConsoleReporter } from 'jasmine' const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized' const DATABASE_NOT_FOUND_ERROR_CODE = From cd7fd7ac23f4eaf6ba969ebbfc08fc10a4281758 Mon Sep 17 00:00:00 2001 From: Ali Ince Date: Tue, 1 Oct 2019 17:24:13 +0100 Subject: [PATCH 3/3] Fix test failures after latest server changes --- test/rx/summary.test.js | 23 ++++++++++++++--------- test/session.test.js | 18 +++++++++--------- test/spatial-types.test.js | 12 ++++++------ test/stress.test.js | 10 +++++----- test/transaction.test.js | 2 +- test/types.test.js | 6 +++--- 6 files changed, 38 insertions(+), 33 deletions(-) diff --git a/test/rx/summary.test.js b/test/rx/summary.test.js index b694a420f..bb3239058 100644 --- a/test/rx/summary.test.js +++ b/test/rx/summary.test.js @@ -578,7 +578,7 @@ describe('#integration-rx summary', () => { 'The provided label is not in the database.' ) expect(summary.notifications[0].description).toBe( - 'One of the labels in your query is not available in the database, make sure you didn\'t misspell it or that the label is available when you run this statement in your application (the missing label name is: ThisLabelDoesNotExist)' + "One of the labels in your query is not available in the database, make sure you didn't misspell it or that the label is available when you run this statement in your application (the missing label name is: ThisLabelDoesNotExist)" ) expect(summary.notifications[0].severity).toBe('WARNING') } @@ -652,20 +652,25 @@ describe('#integration-rx summary', () => { } async function dropConstraintsAndIndices (driver) { + function getName (record) { + const obj = record.toObject() + const name = obj.description || obj.name + if (!name) { + throw new Error('unable to identify name of the constraint/index') + } + return name + } + const session = driver.session() try { - const constraints = await session.run( - "CALL db.constraints() yield description RETURN 'DROP ' + description" - ) + const constraints = await session.run('CALL db.constraints()') for (let i = 0; i < constraints.records.length; i++) { - await session.run(constraints.records[0].get(0)) + await session.run(`DROP ${getName(constraints.records[i])}`) } - const indices = await session.run( - "CALL db.indexes() yield description RETURN 'DROP ' + description" - ) + const indices = await session.run('CALL db.indexes()') for (let i = 0; i < indices.records.length; i++) { - await session.run(indices.records[0].get(0)) + await session.run(`DROP ${getName(indices.records[i])}`) } } finally { await session.close() diff --git a/test/session.test.js b/test/session.test.js index 0e6cfb782..4829fcf65 100644 --- a/test/session.test.js +++ b/test/session.test.js @@ -171,7 +171,7 @@ describe('#integration session', () => { it('should accept a statement object ', done => { // Given const statement = { - text: 'RETURN 1 = {param} AS a', + text: 'RETURN 1 = $param AS a', parameters: { param: 1 } } @@ -215,7 +215,7 @@ describe('#integration session', () => { it('should expose summarize method for basic metadata ', done => { // Given - const statement = 'CREATE (n:Label {prop:{prop}}) RETURN n' + const statement = 'CREATE (n:Label {prop: $prop}) RETURN n' const params = { prop: 'string' } // When & Then session.run(statement, params).then(result => { @@ -269,7 +269,7 @@ describe('#integration session', () => { it('should expose plan ', done => { // Given - const statement = 'EXPLAIN CREATE (n:Label {prop:{prop}}) RETURN n' + const statement = 'EXPLAIN CREATE (n:Label {prop: $prop}) RETURN n' const params = { prop: 'string' } // When & Then session.run(statement, params).then(result => { @@ -286,7 +286,7 @@ describe('#integration session', () => { it('should expose profile ', done => { // Given - const statement = 'PROFILE MATCH (n:Label {prop:{prop}}) RETURN n' + const statement = 'PROFILE MATCH (n:Label {prop: $prop}) RETURN n' const params = { prop: 'string' } // When & Then session.run(statement, params).then(result => { @@ -403,7 +403,7 @@ describe('#integration session', () => { throw Error() } - const statement = 'RETURN {param}' + const statement = 'RETURN $param' const params = { param: unpackable } // When & Then session.run(statement, params).catch(ignore => { @@ -883,20 +883,20 @@ describe('#integration session', () => { it('should be able to do nested queries', done => { session .run( - 'CREATE (knight:Person:Knight {name: {name1}, castle: {castle}})' + - 'CREATE (king:Person {name: {name2}, title: {title}})', + 'CREATE (knight:Person:Knight {name: $name1, castle: $castle})' + + 'CREATE (king:Person {name: $name2, title: $title})', { name1: 'Lancelot', castle: 'Camelot', name2: 'Arthur', title: 'King' } ) .then(() => { session .run( - 'MATCH (knight:Person:Knight) WHERE knight.castle = {castle} RETURN id(knight) AS knight_id', + 'MATCH (knight:Person:Knight) WHERE knight.castle = $castle RETURN id(knight) AS knight_id', { castle: 'Camelot' } ) .subscribe({ onNext: record => { session.run( - 'MATCH (knight) WHERE id(knight) = {id} MATCH (king:Person) WHERE king.name = {king} CREATE (knight)-[:DEFENDS]->(king)', + 'MATCH (knight) WHERE id(knight) = $id MATCH (king:Person) WHERE king.name = $king CREATE (knight)-[:DEFENDS]->(king)', { id: record.get('knight_id'), king: 'Arthur' } ) }, diff --git a/test/spatial-types.test.js b/test/spatial-types.test.js index 3168ec0c7..5f9c8b2be 100644 --- a/test/spatial-types.test.js +++ b/test/spatial-types.test.js @@ -110,7 +110,7 @@ describe('#integration spatial-types', () => { it('should receive 2D points with crs', done => { testReceivingOfPoints( done, - `RETURN point({x: 2.3, y: 4.5, crs: 'WGS-84'})`, + 'RETURN point({x: 2.3, y: 4.5, crs: \'WGS-84\'})', point => { expect(isPoint(point)).toBeTruthy() expect(point.srid).toEqual(WGS_84_2D_CRS_CODE) @@ -138,7 +138,7 @@ describe('#integration spatial-types', () => { it('should receive 3D points with crs', done => { testReceivingOfPoints( done, - `RETURN point({x: 34.76, y: 11.9, z: -99.01, crs: 'WGS-84-3D'})`, + 'RETURN point({x: 34.76, y: 11.9, z: -99.01, crs: \'WGS-84-3D\'})', point => { expect(isPoint(point)).toBeTruthy() expect(point.srid).toEqual(WGS_84_3D_CRS_CODE) @@ -166,10 +166,10 @@ describe('#integration spatial-types', () => { it('should send and receive array of 2D points', done => { const arrayOfPoints = [ new Point(WGS_84_2D_CRS_CODE, 12.3, 11.2), - new Point(WGS_84_2D_CRS_CODE, 2.45, 91.302), - new Point(WGS_84_2D_CRS_CODE, 0.12, -99.9), - new Point(WGS_84_2D_CRS_CODE, 93.75, 123.213), - new Point(WGS_84_2D_CRS_CODE, 111.13, -90.1), + new Point(WGS_84_2D_CRS_CODE, 2.45, 81.302), + new Point(WGS_84_2D_CRS_CODE, 0.12, -89.9), + new Point(WGS_84_2D_CRS_CODE, 93.75, 23.213), + new Point(WGS_84_2D_CRS_CODE, 111.13, -70.1), new Point(WGS_84_2D_CRS_CODE, 43.99, -1) ] diff --git a/test/stress.test.js b/test/stress.test.js index 8547217ba..99f0a4172 100644 --- a/test/stress.test.js +++ b/test/stress.test.js @@ -40,7 +40,7 @@ describe('#integration stress tests', () => { const READ_QUERY = 'MATCH (n) RETURN n LIMIT 1' const WRITE_QUERY = - 'CREATE (person:Person:Employee {name: {name}, salary: {salary}}) RETURN person' + 'CREATE (person:Person:Employee {name: $name, salary: $salary}) RETURN person' const TEST_MODE = modeFromEnvOrDefault('STRESS_TEST_MODE') const DATABASE_URI = fromEnvOrDefault( @@ -231,7 +231,7 @@ describe('#integration stress tests', () => { .run(query, params) .then(result => { context.queryCompleted(result, accessMode) - context.log(commandId, `Query completed successfully`) + context.log(commandId, 'Query completed successfully') return session.close().then(() => { const possibleError = verifyQueryResult(result) @@ -272,7 +272,7 @@ describe('#integration stress tests', () => { resultPromise .then(result => { context.queryCompleted(result, accessMode, session.lastBookmark()) - context.log(commandId, `Transaction function executed successfully`) + context.log(commandId, 'Transaction function executed successfully') return session.close().then(() => { const possibleError = verifyQueryResult(result) @@ -322,7 +322,7 @@ describe('#integration stress tests', () => { }) .then(() => { context.queryCompleted(result, accessMode, session.lastBookmark()) - context.log(commandId, `Transaction committed successfully`) + context.log(commandId, 'Transaction committed successfully') return session.close().then(() => { callback(commandError) @@ -341,7 +341,7 @@ describe('#integration stress tests', () => { function verifyQueryResult (result) { if (!result) { - return new Error(`Received undefined result`) + return new Error('Received undefined result') } else if (result.records.length === 0) { // it is ok to receive no nodes back for read queries at the beginning of the test return null diff --git a/test/transaction.test.js b/test/transaction.test.js index 021a744d1..6d35ff37a 100644 --- a/test/transaction.test.js +++ b/test/transaction.test.js @@ -94,7 +94,7 @@ describe('#integration transaction', () => { const tx = session.beginTransaction() tx.run("RETURN 'foo' AS res") .then(result => { - tx.run('CREATE ({name: {param}})', { + tx.run('CREATE ({name: $param})', { param: result.records[0].get('res') }) .then(() => { diff --git a/test/types.test.js b/test/types.test.js index efd20569d..59f2d8377 100644 --- a/test/types.test.js +++ b/test/types.test.js @@ -179,7 +179,7 @@ describe('#integration path values', () => { }) describe('#integration byte arrays', () => { - let originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL + const originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL beforeAll(() => { jasmine.DEFAULT_TIMEOUT_INTERVAL = 60000 @@ -213,7 +213,7 @@ describe('#integration byte arrays', () => { const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken) const session = driver.session() session - .run('RETURN {array}', { array: randomByteArray(42) }) + .run('RETURN $array', { array: randomByteArray(42) }) .catch(error => { expect(error.message).toEqual( 'Byte arrays are not supported by the database this driver is connected to' @@ -255,7 +255,7 @@ function runReturnQuery (driver, actual, expected) { const session = driver.session() return new Promise((resolve, reject) => { session - .run('RETURN {val} as v', { val: actual }) + .run('RETURN $val as v', { val: actual }) .then(result => { expect(result.records[0].get('v')).toEqual(expected || actual) })