Skip to content

Commit 71969c1

Browse files
authored
Merge pull request #490 from ali-ince/4.0-purge-aged-routing-tables
Purge old routing table entries on inactivity
2 parents dca6185 + cd7fd7a commit 71969c1

10 files changed

+184
-50
lines changed

src/internal/connection-provider-routing.js

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ import ConnectionErrorHandler from './connection-error-handler'
3131
import DelegateConnection from './connection-delegate'
3232
import LeastConnectedLoadBalancingStrategy from './least-connected-load-balancing-strategy'
3333
import Bookmark from './bookmark'
34+
import { int } from '../integer'
3435

3536
const UNAUTHORIZED_ERROR_CODE = 'Neo.ClientError.Security.Unauthorized'
3637
const DATABASE_NOT_FOUND_ERROR_CODE =
3738
'Neo.ClientError.Database.DatabaseNotFound'
3839
const SYSTEM_DB_NAME = 'system'
3940
const DEFAULT_DB_NAME = ''
41+
const DEFAULT_ROUTING_TABLE_PURGE_DELAY = int(30000)
4042

4143
export default class RoutingConnectionProvider extends PooledConnectionProvider {
4244
constructor ({
@@ -47,7 +49,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
4749
config,
4850
log,
4951
userAgent,
50-
authToken
52+
authToken,
53+
routingTablePurgeDelay
5154
}) {
5255
super({ id, config, log, userAgent, authToken })
5356

@@ -61,6 +64,9 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
6164
this._dnsResolver = new HostNameResolver()
6265
this._log = log
6366
this._useSeedRouter = true
67+
this._routingTablePurgeDelay = routingTablePurgeDelay
68+
? int(routingTablePurgeDelay)
69+
: DEFAULT_ROUTING_TABLE_PURGE_DELAY
6470
}
6571

6672
_createConnectionErrorHandler () {
@@ -71,23 +77,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
7177

7278
_handleUnavailability (error, address, database) {
7379
this._log.warn(
74-
`Routing driver ${
75-
this._id
76-
} will forget ${address} for database '${database}' because of an error ${
77-
error.code
78-
} '${error.message}'`
80+
`Routing driver ${this._id} will forget ${address} for database '${database}' because of an error ${error.code} '${error.message}'`
7981
)
8082
this.forget(address, database || '')
8183
return error
8284
}
8385

8486
_handleWriteFailure (error, address, database) {
8587
this._log.warn(
86-
`Routing driver ${
87-
this._id
88-
} will forget writer ${address} for database '${database}' because of an error ${
89-
error.code
90-
} '${error.message}'`
88+
`Routing driver ${this._id} will forget writer ${address} for database '${database}' because of an error ${error.code} '${error.message}'`
9189
)
9290
this.forgetWriter(address, database || '')
9391
return newError(
@@ -427,6 +425,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
427425
// close old connections to servers not present in the new routing table
428426
await this._connectionPool.keepAll(newRoutingTable.allServers())
429427

428+
// filter out expired to purge (expired for a pre-configured amount of time) routing table entries
429+
Object.values(this._routingTables).forEach(value => {
430+
if (value.isExpiredFor(this._routingTablePurgeDelay)) {
431+
delete this._routingTables[value.database]
432+
}
433+
})
434+
430435
// make this driver instance aware of the new table
431436
this._routingTables[newRoutingTable.database] = newRoutingTable
432437
this._log.info(`Updated routing table ${newRoutingTable}`)

src/internal/routing-table.js

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ const MIN_ROUTERS = 1
2424
export default class RoutingTable {
2525
constructor ({ database, routers, readers, writers, expirationTime } = {}) {
2626
this.database = database
27+
this.databaseName = database || 'default database'
2728
this.routers = routers || []
2829
this.readers = readers || []
2930
this.writers = writers || []
@@ -61,14 +62,24 @@ export default class RoutingTable {
6162
)
6263
}
6364

65+
/**
66+
* Check if this routing table is expired for specified amount of duration
67+
*
68+
* @param {Integer} duration amount of duration in milliseconds to check for expiration
69+
* @returns {boolean}
70+
*/
71+
isExpiredFor (duration) {
72+
return this.expirationTime.add(duration).lessThan(Date.now())
73+
}
74+
6475
allServers () {
6576
return [...this.routers, ...this.readers, ...this.writers]
6677
}
6778

6879
toString () {
6980
return (
70-
`RoutingTable[` +
71-
`database=${this.database}, ` +
81+
'RoutingTable[' +
82+
`database=${this.databaseName}, ` +
7283
`expirationTime=${this.expirationTime}, ` +
7384
`currentTime=${Date.now()}, ` +
7485
`routers=[${this.routers}], ` +

test/internal/connection-provider-routing.test.js

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,6 +1723,92 @@ describe('#unit RoutingConnectionProvider', () => {
17231723
[serverC]
17241724
)
17251725
})
1726+
1727+
it('should purge expired routing tables after specified duration on update', async () => {
1728+
var originalDateNow = Date.now
1729+
Date.now = () => 50000
1730+
try {
1731+
const routingTableToLoad = newRoutingTable(
1732+
'databaseC',
1733+
[server1, server2, server3],
1734+
[server2, server3],
1735+
[server1]
1736+
)
1737+
const connectionProvider = newRoutingConnectionProviderWithSeedRouter(
1738+
server1,
1739+
[server1],
1740+
[
1741+
newRoutingTable(
1742+
'databaseA',
1743+
[server1, server2, server3],
1744+
[server1, server2],
1745+
[server3],
1746+
int(Date.now() + 12000)
1747+
),
1748+
newRoutingTable(
1749+
'databaseB',
1750+
[server1, server2, server3],
1751+
[server1, server3],
1752+
[server2],
1753+
int(Date.now() + 2000)
1754+
)
1755+
],
1756+
{
1757+
databaseC: {
1758+
'server1:7687': routingTableToLoad
1759+
}
1760+
},
1761+
null,
1762+
4000
1763+
)
1764+
1765+
expectRoutingTable(
1766+
connectionProvider,
1767+
'databaseA',
1768+
[server1, server2, server3],
1769+
[server1, server2],
1770+
[server3]
1771+
)
1772+
expectRoutingTable(
1773+
connectionProvider,
1774+
'databaseB',
1775+
[server1, server2, server3],
1776+
[server1, server3],
1777+
[server2]
1778+
)
1779+
1780+
// make routing table for databaseA to report true for isExpiredFor(4000)
1781+
// call.
1782+
Date.now = () => 58000
1783+
1784+
// force a routing table update for databaseC
1785+
const conn1 = await connectionProvider.acquireConnection({
1786+
accessMode: WRITE,
1787+
database: 'databaseC'
1788+
})
1789+
expect(conn1).not.toBeNull()
1790+
expect(conn1.address).toBe(server1)
1791+
1792+
// Then
1793+
expectRoutingTable(
1794+
connectionProvider,
1795+
'databaseA',
1796+
[server1, server2, server3],
1797+
[server1, server2],
1798+
[server3]
1799+
)
1800+
expectRoutingTable(
1801+
connectionProvider,
1802+
'databaseC',
1803+
[server1, server2, server3],
1804+
[server2, server3],
1805+
[server1]
1806+
)
1807+
expectNoRoutingTable(connectionProvider, 'databaseB')
1808+
} finally {
1809+
Date.now = originalDateNow
1810+
}
1811+
})
17261812
})
17271813
})
17281814

@@ -1746,7 +1832,8 @@ function newRoutingConnectionProviderWithSeedRouter (
17461832
seedRouterResolved,
17471833
routingTables,
17481834
routerToRoutingTable = { '': {} },
1749-
connectionPool = null
1835+
connectionPool = null,
1836+
routingTablePurgeDelay = null
17501837
) {
17511838
const pool = connectionPool || newPool()
17521839
const connectionProvider = new RoutingConnectionProvider({
@@ -1755,7 +1842,8 @@ function newRoutingConnectionProviderWithSeedRouter (
17551842
routingContext: {},
17561843
hostNameResolver: new SimpleHostNameResolver(),
17571844
config: {},
1758-
log: Logger.noOp()
1845+
log: Logger.noOp(),
1846+
routingTablePurgeDelay: routingTablePurgeDelay
17591847
})
17601848
connectionProvider._connectionPool = pool
17611849
routingTables.forEach(r => {
@@ -1821,6 +1909,10 @@ function expectRoutingTable (
18211909
expect(connectionProvider._routingTables[database].writers).toEqual(writers)
18221910
}
18231911

1912+
function expectNoRoutingTable (connectionProvider, database) {
1913+
expect(connectionProvider._routingTables[database]).toBeFalsy()
1914+
}
1915+
18241916
function expectPoolToContain (pool, addresses) {
18251917
addresses.forEach(address => {
18261918
expect(pool.has(address)).toBeTruthy()

test/internal/routing-table.test.js

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,29 @@ describe('#unit RoutingTable', () => {
201201
}
202202
})
203203

204-
function expired () {
205-
return Date.now() - 3600 // expired an hour ago
204+
it('should report correct value when expired for is tested', () => {
205+
const originalDateNow = Date.now
206+
try {
207+
Date.now = () => 50000
208+
const table = createTable(
209+
[server1, server2, server3],
210+
[server2, server1, server5],
211+
[server5, server1],
212+
expired(7200)
213+
)
214+
215+
expect(table.isStaleFor(READ)).toBeTruthy()
216+
expect(table.isStaleFor(WRITE)).toBeTruthy()
217+
218+
expect(table.isExpiredFor(3600)).toBeTruthy()
219+
expect(table.isExpiredFor(10800)).toBeFalsy()
220+
} finally {
221+
Date.now = originalDateNow
222+
}
223+
})
224+
225+
function expired (expiredFor) {
226+
return Date.now() - (expiredFor || 3600) // expired an hour ago
206227
}
207228

208229
function notExpired () {

test/rx/summary.test.js

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -578,7 +578,7 @@ describe('#integration-rx summary', () => {
578578
'The provided label is not in the database.'
579579
)
580580
expect(summary.notifications[0].description).toBe(
581-
'One of the labels in your query is not available in the database, make sure you didn\'t misspell it or that the label is available when you run this statement in your application (the missing label name is: ThisLabelDoesNotExist)'
581+
"One of the labels in your query is not available in the database, make sure you didn't misspell it or that the label is available when you run this statement in your application (the missing label name is: ThisLabelDoesNotExist)"
582582
)
583583
expect(summary.notifications[0].severity).toBe('WARNING')
584584
}
@@ -652,20 +652,25 @@ describe('#integration-rx summary', () => {
652652
}
653653

654654
async function dropConstraintsAndIndices (driver) {
655+
function getName (record) {
656+
const obj = record.toObject()
657+
const name = obj.description || obj.name
658+
if (!name) {
659+
throw new Error('unable to identify name of the constraint/index')
660+
}
661+
return name
662+
}
663+
655664
const session = driver.session()
656665
try {
657-
const constraints = await session.run(
658-
"CALL db.constraints() yield description RETURN 'DROP ' + description"
659-
)
666+
const constraints = await session.run('CALL db.constraints()')
660667
for (let i = 0; i < constraints.records.length; i++) {
661-
await session.run(constraints.records[0].get(0))
668+
await session.run(`DROP ${getName(constraints.records[i])}`)
662669
}
663670

664-
const indices = await session.run(
665-
"CALL db.indexes() yield description RETURN 'DROP ' + description"
666-
)
671+
const indices = await session.run('CALL db.indexes()')
667672
for (let i = 0; i < indices.records.length; i++) {
668-
await session.run(indices.records[0].get(0))
673+
await session.run(`DROP ${getName(indices.records[i])}`)
669674
}
670675
} finally {
671676
await session.close()

test/session.test.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ describe('#integration session', () => {
171171
it('should accept a statement object ', done => {
172172
// Given
173173
const statement = {
174-
text: 'RETURN 1 = {param} AS a',
174+
text: 'RETURN 1 = $param AS a',
175175
parameters: { param: 1 }
176176
}
177177

@@ -215,7 +215,7 @@ describe('#integration session', () => {
215215

216216
it('should expose summarize method for basic metadata ', done => {
217217
// Given
218-
const statement = 'CREATE (n:Label {prop:{prop}}) RETURN n'
218+
const statement = 'CREATE (n:Label {prop: $prop}) RETURN n'
219219
const params = { prop: 'string' }
220220
// When & Then
221221
session.run(statement, params).then(result => {
@@ -269,7 +269,7 @@ describe('#integration session', () => {
269269

270270
it('should expose plan ', done => {
271271
// Given
272-
const statement = 'EXPLAIN CREATE (n:Label {prop:{prop}}) RETURN n'
272+
const statement = 'EXPLAIN CREATE (n:Label {prop: $prop}) RETURN n'
273273
const params = { prop: 'string' }
274274
// When & Then
275275
session.run(statement, params).then(result => {
@@ -286,7 +286,7 @@ describe('#integration session', () => {
286286

287287
it('should expose profile ', done => {
288288
// Given
289-
const statement = 'PROFILE MATCH (n:Label {prop:{prop}}) RETURN n'
289+
const statement = 'PROFILE MATCH (n:Label {prop: $prop}) RETURN n'
290290
const params = { prop: 'string' }
291291
// When & Then
292292
session.run(statement, params).then(result => {
@@ -403,7 +403,7 @@ describe('#integration session', () => {
403403
throw Error()
404404
}
405405

406-
const statement = 'RETURN {param}'
406+
const statement = 'RETURN $param'
407407
const params = { param: unpackable }
408408
// When & Then
409409
session.run(statement, params).catch(ignore => {
@@ -883,20 +883,20 @@ describe('#integration session', () => {
883883
it('should be able to do nested queries', done => {
884884
session
885885
.run(
886-
'CREATE (knight:Person:Knight {name: {name1}, castle: {castle}})' +
887-
'CREATE (king:Person {name: {name2}, title: {title}})',
886+
'CREATE (knight:Person:Knight {name: $name1, castle: $castle})' +
887+
'CREATE (king:Person {name: $name2, title: $title})',
888888
{ name1: 'Lancelot', castle: 'Camelot', name2: 'Arthur', title: 'King' }
889889
)
890890
.then(() => {
891891
session
892892
.run(
893-
'MATCH (knight:Person:Knight) WHERE knight.castle = {castle} RETURN id(knight) AS knight_id',
893+
'MATCH (knight:Person:Knight) WHERE knight.castle = $castle RETURN id(knight) AS knight_id',
894894
{ castle: 'Camelot' }
895895
)
896896
.subscribe({
897897
onNext: record => {
898898
session.run(
899-
'MATCH (knight) WHERE id(knight) = {id} MATCH (king:Person) WHERE king.name = {king} CREATE (knight)-[:DEFENDS]->(king)',
899+
'MATCH (knight) WHERE id(knight) = $id MATCH (king:Person) WHERE king.name = $king CREATE (knight)-[:DEFENDS]->(king)',
900900
{ id: record.get('knight_id'), king: 'Arthur' }
901901
)
902902
},

0 commit comments

Comments
 (0)