Skip to content

Purge old routing table entries on inactivity #490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/internal/connection-provider-routing.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import ConnectionErrorHandler from './connection-error-handler'
import DelegateConnection from './connection-delegate'
import LeastConnectedLoadBalancingStrategy from './least-connected-load-balancing-strategy'
import Bookmark from './bookmark'
import { int } from '../integer'

const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized'
const DATABASE_NOT_FOUND_ERROR_CODE =
'Neo.ClientError.Database.DatabaseNotFound'
const SYSTEM_DB_NAME = 'system'
const DEFAULT_DB_NAME = ''
const DEFAULT_ROUTING_TABLE_PURGE_DELAY = int(30000)

export default class RoutingConnectionProvider extends PooledConnectionProvider {
constructor ({
Expand All @@ -47,7 +49,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
config,
log,
userAgent,
authToken
authToken,
routingTablePurgeDelay
}) {
super({ id, config, log, userAgent, authToken })

Expand All @@ -61,6 +64,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._dnsResolver = new HostNameResolver()
this._log = log
this._useSeedRouter = true
this._routingTablePurgeDelay = routingTablePurgeDelay
? int(routingTablePurgeDelay)
: DEFAULT_ROUTING_TABLE_PURGE_DELAY
}

_createConnectionErrorHandler () {
Expand All @@ -71,23 +77,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider

_handleUnavailability (error, address, database) {
this._log.warn(
`Routing driver ${
this._id
} will forget ${address} for database '${database}' because of an error ${
error.code
} '${error.message}'`
`Routing driver ${this._id} will forget ${address} for database '${database}' because of an error ${error.code} '${error.message}'`
)
this.forget(address, database || '')
return error
}

_handleWriteFailure (error, address, database) {
this._log.warn(
`Routing driver ${
this._id
} will forget writer ${address} for database '${database}' because of an error ${
error.code
} '${error.message}'`
`Routing driver ${this._id} will forget writer ${address} for database '${database}' because of an error ${error.code} '${error.message}'`
)
this.forgetWriter(address, database || '')
return newError(
Expand Down Expand Up @@ -427,6 +425,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
// close old connections to servers not present in the new routing table
await this._connectionPool.keepAll(newRoutingTable.allServers())

// filter out expired to purge (expired for a pre-configured amount of time) routing table entries
Object.values(this._routingTables).forEach(value => {
if (value.isExpiredFor(this._routingTablePurgeDelay)) {
delete this._routingTables[value.database]
}
})

// make this driver instance aware of the new table
this._routingTables[newRoutingTable.database] = newRoutingTable
this._log.info(`Updated routing table ${newRoutingTable}`)
Expand Down
15 changes: 13 additions & 2 deletions src/internal/routing-table.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const MIN_ROUTERS = 1
export default class RoutingTable {
constructor ({ database, routers, readers, writers, expirationTime } = {}) {
this.database = database
this.databaseName = database || 'default database'
this.routers = routers || []
this.readers = readers || []
this.writers = writers || []
Expand Down Expand Up @@ -61,14 +62,24 @@ export default class RoutingTable {
)
}

/**
* Check if this routing table is expired for specified amount of duration
*
* @param {Integer} duration amount of duration in milliseconds to check for expiration
* @returns {boolean}
*/
isExpiredFor (duration) {
return this.expirationTime.add(duration).lessThan(Date.now())
}

allServers () {
return [...this.routers, ...this.readers, ...this.writers]
}

toString () {
return (
`RoutingTable[` +
`database=${this.database}, ` +
'RoutingTable[' +
`database=${this.databaseName}, ` +
`expirationTime=${this.expirationTime}, ` +
`currentTime=${Date.now()}, ` +
`routers=[${this.routers}], ` +
Expand Down
96 changes: 94 additions & 2 deletions test/internal/connection-provider-routing.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,92 @@ describe('#unit RoutingConnectionProvider', () => {
[serverC]
)
})

it('should purge expired routing tables after specified duration on update', async () => {
var originalDateNow = Date.now
Date.now = () => 50000
try {
const routingTableToLoad = newRoutingTable(
'databaseC',
[server1, server2, server3],
[server2, server3],
[server1]
)
const connectionProvider = newRoutingConnectionProviderWithSeedRouter(
server1,
[server1],
[
newRoutingTable(
'databaseA',
[server1, server2, server3],
[server1, server2],
[server3],
int(Date.now() + 12000)
),
newRoutingTable(
'databaseB',
[server1, server2, server3],
[server1, server3],
[server2],
int(Date.now() + 2000)
)
],
{
databaseC: {
'server1:7687': routingTableToLoad
}
},
null,
4000
)

expectRoutingTable(
connectionProvider,
'databaseA',
[server1, server2, server3],
[server1, server2],
[server3]
)
expectRoutingTable(
connectionProvider,
'databaseB',
[server1, server2, server3],
[server1, server3],
[server2]
)

// make routing table for databaseA to report true for isExpiredFor(4000)
// call.
Date.now = () => 58000

// force a routing table update for databaseC
const conn1 = await connectionProvider.acquireConnection({
accessMode: WRITE,
database: 'databaseC'
})
expect(conn1).not.toBeNull()
expect(conn1.address).toBe(server1)

// Then
expectRoutingTable(
connectionProvider,
'databaseA',
[server1, server2, server3],
[server1, server2],
[server3]
)
expectRoutingTable(
connectionProvider,
'databaseC',
[server1, server2, server3],
[server2, server3],
[server1]
)
expectNoRoutingTable(connectionProvider, 'databaseB')
} finally {
Date.now = originalDateNow
}
})
})
})

Expand All @@ -1746,7 +1832,8 @@ function newRoutingConnectionProviderWithSeedRouter (
seedRouterResolved,
routingTables,
routerToRoutingTable = { '': {} },
connectionPool = null
connectionPool = null,
routingTablePurgeDelay = null
) {
const pool = connectionPool || newPool()
const connectionProvider = new RoutingConnectionProvider({
Expand All @@ -1755,7 +1842,8 @@ function newRoutingConnectionProviderWithSeedRouter (
routingContext: {},
hostNameResolver: new SimpleHostNameResolver(),
config: {},
log: Logger.noOp()
log: Logger.noOp(),
routingTablePurgeDelay: routingTablePurgeDelay
})
connectionProvider._connectionPool = pool
routingTables.forEach(r => {
Expand Down Expand Up @@ -1821,6 +1909,10 @@ function expectRoutingTable (
expect(connectionProvider._routingTables[database].writers).toEqual(writers)
}

function expectNoRoutingTable (connectionProvider, database) {
expect(connectionProvider._routingTables[database]).toBeFalsy()
}

function expectPoolToContain (pool, addresses) {
addresses.forEach(address => {
expect(pool.has(address)).toBeTruthy()
Expand Down
25 changes: 23 additions & 2 deletions test/internal/routing-table.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,29 @@ describe('#unit RoutingTable', () => {
}
})

function expired () {
return Date.now() - 3600 // expired an hour ago
it('should report correct value when expired for is tested', () => {
const originalDateNow = Date.now
try {
Date.now = () => 50000
const table = createTable(
[server1, server2, server3],
[server2, server1, server5],
[server5, server1],
expired(7200)
)

expect(table.isStaleFor(READ)).toBeTruthy()
expect(table.isStaleFor(WRITE)).toBeTruthy()

expect(table.isExpiredFor(3600)).toBeTruthy()
expect(table.isExpiredFor(10800)).toBeFalsy()
} finally {
Date.now = originalDateNow
}
})

function expired (expiredFor) {
return Date.now() - (expiredFor || 3600) // expired an hour ago
}

function notExpired () {
Expand Down
23 changes: 14 additions & 9 deletions test/rx/summary.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ describe('#integration-rx summary', () => {
'The provided label is not in the database.'
)
expect(summary.notifications[0].description).toBe(
'One of the labels in your query is not available in the database, make sure you didn\'t misspell it or that the label is available when you run this statement in your application (the missing label name is: ThisLabelDoesNotExist)'
"One of the labels in your query is not available in the database, make sure you didn't misspell it or that the label is available when you run this statement in your application (the missing label name is: ThisLabelDoesNotExist)"
)
expect(summary.notifications[0].severity).toBe('WARNING')
}
Expand Down Expand Up @@ -652,20 +652,25 @@ describe('#integration-rx summary', () => {
}

async function dropConstraintsAndIndices (driver) {
function getName (record) {
const obj = record.toObject()
const name = obj.description || obj.name
if (!name) {
throw new Error('unable to identify name of the constraint/index')
}
return name
}

const session = driver.session()
try {
const constraints = await session.run(
"CALL db.constraints() yield description RETURN 'DROP ' + description"
)
const constraints = await session.run('CALL db.constraints()')
for (let i = 0; i < constraints.records.length; i++) {
await session.run(constraints.records[0].get(0))
await session.run(`DROP ${getName(constraints.records[i])}`)
}

const indices = await session.run(
"CALL db.indexes() yield description RETURN 'DROP ' + description"
)
const indices = await session.run('CALL db.indexes()')
for (let i = 0; i < indices.records.length; i++) {
await session.run(indices.records[0].get(0))
await session.run(`DROP ${getName(indices.records[i])}`)
}
} finally {
await session.close()
Expand Down
18 changes: 9 additions & 9 deletions test/session.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ describe('#integration session', () => {
it('should accept a statement object ', done => {
// Given
const statement = {
text: 'RETURN 1 = {param} AS a',
text: 'RETURN 1 = $param AS a',
parameters: { param: 1 }
}

Expand Down Expand Up @@ -215,7 +215,7 @@ describe('#integration session', () => {

it('should expose summarize method for basic metadata ', done => {
// Given
const statement = 'CREATE (n:Label {prop:{prop}}) RETURN n'
const statement = 'CREATE (n:Label {prop: $prop}) RETURN n'
const params = { prop: 'string' }
// When & Then
session.run(statement, params).then(result => {
Expand Down Expand Up @@ -269,7 +269,7 @@ describe('#integration session', () => {

it('should expose plan ', done => {
// Given
const statement = 'EXPLAIN CREATE (n:Label {prop:{prop}}) RETURN n'
const statement = 'EXPLAIN CREATE (n:Label {prop: $prop}) RETURN n'
const params = { prop: 'string' }
// When & Then
session.run(statement, params).then(result => {
Expand All @@ -286,7 +286,7 @@ describe('#integration session', () => {

it('should expose profile ', done => {
// Given
const statement = 'PROFILE MATCH (n:Label {prop:{prop}}) RETURN n'
const statement = 'PROFILE MATCH (n:Label {prop: $prop}) RETURN n'
const params = { prop: 'string' }
// When & Then
session.run(statement, params).then(result => {
Expand Down Expand Up @@ -403,7 +403,7 @@ describe('#integration session', () => {
throw Error()
}

const statement = 'RETURN {param}'
const statement = 'RETURN $param'
const params = { param: unpackable }
// When & Then
session.run(statement, params).catch(ignore => {
Expand Down Expand Up @@ -883,20 +883,20 @@ describe('#integration session', () => {
it('should be able to do nested queries', done => {
session
.run(
'CREATE (knight:Person:Knight {name: {name1}, castle: {castle}})' +
'CREATE (king:Person {name: {name2}, title: {title}})',
'CREATE (knight:Person:Knight {name: $name1, castle: $castle})' +
'CREATE (king:Person {name: $name2, title: $title})',
{ name1: 'Lancelot', castle: 'Camelot', name2: 'Arthur', title: 'King' }
)
.then(() => {
session
.run(
'MATCH (knight:Person:Knight) WHERE knight.castle = {castle} RETURN id(knight) AS knight_id',
'MATCH (knight:Person:Knight) WHERE knight.castle = $castle RETURN id(knight) AS knight_id',
{ castle: 'Camelot' }
)
.subscribe({
onNext: record => {
session.run(
'MATCH (knight) WHERE id(knight) = {id} MATCH (king:Person) WHERE king.name = {king} CREATE (knight)-[:DEFENDS]->(king)',
'MATCH (knight) WHERE id(knight) = $id MATCH (king:Person) WHERE king.name = $king CREATE (knight)-[:DEFENDS]->(king)',
{ id: record.get('knight_id'), king: 'Arthur' }
)
},
Expand Down
Loading