Skip to content

Reduce number of ROUTE requests when routing table is stale (#1119) #1121

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
ConnectionErrorHandler,
DelegateConnection
} from '../connection'
import { functional } from '../lang'

const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error
const {
Expand Down Expand Up @@ -85,6 +86,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
? int(routingTablePurgeDelay)
: DEFAULT_ROUTING_TABLE_PURGE_DELAY
)

this._refreshRoutingTable = functional.reuseOngoingRequest(this._refreshRoutingTable, this)
}

_createConnectionErrorHandler () {
Expand Down Expand Up @@ -140,7 +143,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
const routingTable = await this._freshRoutingTable({
accessMode,
database: context.database,
bookmark: bookmarks,
bookmarks,
impersonatedUser,
onDatabaseNameResolved: (databaseName) => {
context.database = context.database || databaseName
Expand Down Expand Up @@ -259,7 +262,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
return this._connectionPool.acquire(address)
}

_freshRoutingTable ({ accessMode, database, bookmark, impersonatedUser, onDatabaseNameResolved } = {}) {
_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) {
const currentRoutingTable = this._routingTableRegistry.get(
database,
() => new RoutingTable({ database })
Expand All @@ -271,44 +274,45 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._log.info(
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
)
return this._refreshRoutingTable(currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved)
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser)
.then(newRoutingTable => {
onDatabaseNameResolved(newRoutingTable.database)
return newRoutingTable
})
}

_refreshRoutingTable (currentRoutingTable, bookmark, impersonatedUser, onDatabaseNameResolved) {
_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser) {
const knownRouters = currentRoutingTable.routers

if (this._useSeedRouter) {
return this._fetchRoutingTableFromSeedRouterFallbackToKnownRouters(
knownRouters,
currentRoutingTable,
bookmark,
impersonatedUser,
onDatabaseNameResolved
bookmarks,
impersonatedUser
)
}
return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(
knownRouters,
currentRoutingTable,
bookmark,
impersonatedUser,
onDatabaseNameResolved
bookmarks,
impersonatedUser
)
}

async _fetchRoutingTableFromSeedRouterFallbackToKnownRouters (
knownRouters,
currentRoutingTable,
bookmark,
impersonatedUser,
onDatabaseNameResolved
bookmarks,
impersonatedUser
) {
// we start with seed router, no routers were probed before
const seenRouters = []
let newRoutingTable = await this._fetchRoutingTableUsingSeedRouter(
seenRouters,
this._seedRouter,
currentRoutingTable,
bookmark,
bookmarks,
impersonatedUser
)

Expand All @@ -319,29 +323,27 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
newRoutingTable = await this._fetchRoutingTableUsingKnownRouters(
knownRouters,
currentRoutingTable,
bookmark,
bookmarks,
impersonatedUser
)
}

return await this._applyRoutingTableIfPossible(
currentRoutingTable,
newRoutingTable,
onDatabaseNameResolved
newRoutingTable
)
}

async _fetchRoutingTableFromKnownRoutersFallbackToSeedRouter (
knownRouters,
currentRoutingTable,
bookmark,
bookmarks,
impersonatedUser,
onDatabaseNameResolved
) {
let newRoutingTable = await this._fetchRoutingTableUsingKnownRouters(
knownRouters,
currentRoutingTable,
bookmark,
bookmarks,
impersonatedUser
)

Expand All @@ -351,28 +353,27 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
knownRouters,
this._seedRouter,
currentRoutingTable,
bookmark,
bookmarks,
impersonatedUser
)
}

return await this._applyRoutingTableIfPossible(
currentRoutingTable,
newRoutingTable,
onDatabaseNameResolved
newRoutingTable
)
}

async _fetchRoutingTableUsingKnownRouters (
knownRouters,
currentRoutingTable,
bookmark,
bookmarks,
impersonatedUser
) {
const newRoutingTable = await this._fetchRoutingTable(
knownRouters,
currentRoutingTable,
bookmark,
bookmarks,
impersonatedUser
)

Expand All @@ -397,7 +398,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
seenRouters,
seedRouter,
routingTable,
bookmark,
bookmarks,
impersonatedUser
) {
const resolvedAddresses = await this._resolveSeedRouter(seedRouter)
Expand All @@ -407,7 +408,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
address => seenRouters.indexOf(address) < 0
)

return await this._fetchRoutingTable(newAddresses, routingTable, bookmark, impersonatedUser)
return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser)
}

async _resolveSeedRouter (seedRouter) {
Expand All @@ -419,7 +420,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
return [].concat.apply([], dnsResolvedAddresses)
}

_fetchRoutingTable (routerAddresses, routingTable, bookmark, impersonatedUser) {
_fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser) {
return routerAddresses.reduce(
async (refreshedTablePromise, currentRouter, currentIndex) => {
const newRoutingTable = await refreshedTablePromise
Expand All @@ -441,7 +442,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
// try next router
const session = await this._createSessionForRediscovery(
currentRouter,
bookmark,
bookmarks,
impersonatedUser
)
if (session) {
Expand Down Expand Up @@ -474,7 +475,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
)
}

async _createSessionForRediscovery (routerAddress, bookmark, impersonatedUser) {
async _createSessionForRediscovery (routerAddress, bookmarks, impersonatedUser) {
try {
const connection = await this._connectionPool.acquire(routerAddress)

Expand All @@ -498,7 +499,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
return new Session({
mode: READ,
database: SYSTEM_DB_NAME,
bookmark,
bookmark: bookmarks,
connectionProvider,
impersonatedUser
})
Expand All @@ -513,7 +514,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
}
}

async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, onDatabaseNameResolved) {
async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable) {
if (!newRoutingTable) {
// none of routing servers returned valid routing table, throw exception
throw newError(
Expand All @@ -528,21 +529,18 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._useSeedRouter = true
}

await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved)
await this._updateRoutingTable(newRoutingTable)

return newRoutingTable
}

async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved) {
async _updateRoutingTable (newRoutingTable) {
// close old connections to servers not present in the new routing table
await this._connectionPool.keepAll(newRoutingTable.allServers())
this._routingTableRegistry.removeExpired()
this._routingTableRegistry.register(
newRoutingTable
)

onDatabaseNameResolved(newRoutingTable.database)

this._log.info(`Updated routing table ${newRoutingTable}`)
}

Expand Down
58 changes: 58 additions & 0 deletions packages/bolt-connection/src/lang/functional.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { json } from 'neo4j-driver-core'

/**
* Identity function.
*
* Identity functions are function which returns the input as output.
*
* @param {any} x
* @returns {any} the x
*/
export function identity (x) {
return x
}

/**
* Makes the function able to share ongoing requests
*
* @param {function(...args): Promise} func The function to be decorated
* @param {any} thisArg The `this` which should be used in the function call
* @return {function(...args): Promise} The decorated function
*/
export function reuseOngoingRequest (func, thisArg = null) {
const ongoingRequests = new Map()

return function (...args) {
const key = json.stringify(args)
if (ongoingRequests.has(key)) {
return ongoingRequests.get(key)
}

const promise = func.apply(thisArg, args)

ongoingRequests.set(key, promise)

return promise.finally(() => {
ongoingRequests.delete(key)
})
}
}
19 changes: 19 additions & 0 deletions packages/bolt-connection/src/lang/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export * as functional from './functional'
Loading