Skip to content

Commit fc8837e

Browse files
committed
Support BMM during routing
1 parent e4d73a7 commit fc8837e

File tree

6 files changed

+98
-36
lines changed

6 files changed

+98
-36
lines changed

packages/bolt-connection/src/connection-provider/connection-provider-routing.js

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,14 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
130130
* See {@link ConnectionProvider} for more information about this method and
131131
* its arguments.
132132
*/
133-
async acquireConnection ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) {
133+
async acquireConnection ({
134+
accessMode,
135+
database,
136+
bookmarks,
137+
impersonatedUser,
138+
onDatabaseNameResolved,
139+
onRoutingTableRefresh
140+
} = {}) {
134141
let name
135142
let address
136143
const context = { database: database || DEFAULT_DB_NAME }
@@ -146,14 +153,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
146153
const routingTable = await this._freshRoutingTable({
147154
accessMode,
148155
database: context.database,
149-
bookmarks: bookmarks,
156+
bookmarks,
150157
impersonatedUser,
151158
onDatabaseNameResolved: (databaseName) => {
152159
context.database = context.database || databaseName
153160
if (onDatabaseNameResolved) {
154161
onDatabaseNameResolved(databaseName)
155162
}
156-
}
163+
},
164+
onRoutingTableRefresh
157165
})
158166

159167
// select a target server based on specified access mode
@@ -301,7 +309,14 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
301309
return this._connectionPool.acquire(address)
302310
}
303311

304-
_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) {
312+
_freshRoutingTable ({
313+
accessMode,
314+
database,
315+
bookmarks,
316+
impersonatedUser,
317+
onDatabaseNameResolved,
318+
onRoutingTableRefresh
319+
} = {}) {
305320
const currentRoutingTable = this._routingTableRegistry.get(
306321
database,
307322
() => new RoutingTable({ database })
@@ -313,10 +328,22 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
313328
this._log.info(
314329
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
315330
)
316-
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved)
331+
return this._refreshRoutingTable(
332+
currentRoutingTable,
333+
bookmarks,
334+
impersonatedUser,
335+
onDatabaseNameResolved,
336+
onRoutingTableRefresh
337+
)
317338
}
318339

319-
_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved) {
340+
_refreshRoutingTable (
341+
currentRoutingTable,
342+
bookmarks,
343+
impersonatedUser,
344+
onDatabaseNameResolved,
345+
onRoutingTableRefresh
346+
) {
320347
const knownRouters = currentRoutingTable.routers
321348

322349
if (this._useSeedRouter) {
@@ -325,15 +352,17 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
325352
currentRoutingTable,
326353
bookmarks,
327354
impersonatedUser,
328-
onDatabaseNameResolved
355+
onDatabaseNameResolved,
356+
onRoutingTableRefresh
329357
)
330358
}
331359
return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(
332360
knownRouters,
333361
currentRoutingTable,
334362
bookmarks,
335363
impersonatedUser,
336-
onDatabaseNameResolved
364+
onDatabaseNameResolved,
365+
onRoutingTableRefresh
337366
)
338367
}
339368

@@ -342,7 +371,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
342371
currentRoutingTable,
343372
bookmarks,
344373
impersonatedUser,
345-
onDatabaseNameResolved
374+
onDatabaseNameResolved,
375+
onRoutingTableRefresh
346376
) {
347377
// we start with seed router, no routers were probed before
348378
const seenRouters = []
@@ -372,7 +402,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
372402
currentRoutingTable,
373403
newRoutingTable,
374404
onDatabaseNameResolved,
375-
error
405+
error,
406+
onRoutingTableRefresh
376407
)
377408
}
378409

@@ -381,7 +412,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
381412
currentRoutingTable,
382413
bookmarks,
383414
impersonatedUser,
384-
onDatabaseNameResolved
415+
onDatabaseNameResolved,
416+
onRoutingTableRefresh
385417
) {
386418
let [newRoutingTable, error] = await this._fetchRoutingTableUsingKnownRouters(
387419
knownRouters,
@@ -405,7 +437,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
405437
currentRoutingTable,
406438
newRoutingTable,
407439
onDatabaseNameResolved,
408-
error
440+
error,
441+
onRoutingTableRefresh
409442
)
410443
}
411444

@@ -563,7 +596,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
563596
return [null, error]
564597
}
565598

566-
async _applyRoutingTableIfPossible (currentRoutingTable, newRoutingTable, onDatabaseNameResolved, error) {
599+
async _applyRoutingTableIfPossible (
600+
currentRoutingTable,
601+
newRoutingTable,
602+
onDatabaseNameResolved,
603+
error,
604+
onRoutingTableRefresh
605+
) {
567606
if (!newRoutingTable) {
568607
// none of routing servers returned valid routing table, throw exception
569608
throw newError(
@@ -579,19 +618,22 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
579618
this._useSeedRouter = true
580619
}
581620

582-
await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved)
621+
await this._updateRoutingTable(newRoutingTable, onDatabaseNameResolved, onRoutingTableRefresh)
583622

584623
return newRoutingTable
585624
}
586625

587-
async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved) {
626+
async _updateRoutingTable (newRoutingTable, onDatabaseNameResolved, onRoutingTableRefresh) {
588627
// close old connections to servers not present in the new routing table
589628
await this._connectionPool.keepAll(newRoutingTable.allServers())
590629
this._routingTableRegistry.removeExpired()
591630
this._routingTableRegistry.register(
592631
newRoutingTable
593632
)
594633

634+
if (typeof onRoutingTableRefresh === 'function') {
635+
onRoutingTableRefresh()
636+
}
595637
onDatabaseNameResolved(newRoutingTable.database)
596638

597639
this._log.info(`Updated routing table ${newRoutingTable}`)

packages/core/src/connection-provider.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class ConnectionProvider {
5050
bookmarks: bookmarks.Bookmarks
5151
impersonatedUser?: string
5252
onDatabaseNameResolved?: (databaseName?: string) => void
53+
onRoutingTableRefresh?: () => void
5354
}): Promise<Connection> {
5455
throw Error('Not implemented')
5556
}

packages/core/src/internal/connection-holder.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import Connection from '../connection'
2424
import { ACCESS_MODE_WRITE } from './constants'
2525
import { Bookmarks } from './bookmarks'
2626
import ConnectionProvider from '../connection-provider'
27+
import BookmarkManager from '../bookmark-manager'
2728

2829
/**
2930
* @private
@@ -83,6 +84,7 @@ class ConnectionHolder implements ConnectionHolderInterface {
8384
private _referenceCount: number
8485
private _connectionPromise: Promise<Connection | null>
8586
private readonly _impersonatedUser?: string
87+
private readonly _bookmarkManager?: BookmarkManager
8688
private readonly _onDatabaseNameResolved?: (databaseName?: string) => void
8789

8890
/**
@@ -101,13 +103,15 @@ class ConnectionHolder implements ConnectionHolderInterface {
101103
bookmarks,
102104
connectionProvider,
103105
impersonatedUser,
104-
onDatabaseNameResolved
106+
onDatabaseNameResolved,
107+
bookmarkManager
105108
}: {
106109
mode?: string
107110
database?: string
108111
bookmarks?: Bookmarks
109112
connectionProvider?: ConnectionProvider
110113
impersonatedUser?: string
114+
bookmarkManager?: BookmarkManager
111115
onDatabaseNameResolved?: (databaseName?: string) => void
112116
} = {}) {
113117
this._mode = mode
@@ -118,6 +122,7 @@ class ConnectionHolder implements ConnectionHolderInterface {
118122
this._referenceCount = 0
119123
this._connectionPromise = Promise.resolve(null)
120124
this._onDatabaseNameResolved = onDatabaseNameResolved
125+
this._bookmarkManager = bookmarkManager
121126
}
122127

123128
mode (): string | undefined {
@@ -146,12 +151,16 @@ class ConnectionHolder implements ConnectionHolderInterface {
146151

147152
initializeConnection (): boolean {
148153
if (this._referenceCount === 0 && (this._connectionProvider != null)) {
154+
const bookmarks = this._getBookmarks()
155+
149156
this._connectionPromise = this._connectionProvider.acquireConnection({
150157
accessMode: this._mode,
151158
database: this._database,
152-
bookmarks: this._bookmarks,
159+
bookmarks,
153160
impersonatedUser: this._impersonatedUser,
154-
onDatabaseNameResolved: this._onDatabaseNameResolved
161+
onDatabaseNameResolved: this._onDatabaseNameResolved,
162+
onRoutingTableRefresh:
163+
() => this._bookmarkManager?.updateBookmarks('system', bookmarks.values(), [])
155164
})
156165
} else {
157166
this._referenceCount++
@@ -161,6 +170,13 @@ class ConnectionHolder implements ConnectionHolderInterface {
161170
return true
162171
}
163172

173+
private _getBookmarks (): Bookmarks {
174+
if (this._bookmarkManager != null) {
175+
return new Bookmarks(this._bookmarkManager.getBookmarks('system'))
176+
}
177+
return this._bookmarks
178+
}
179+
164180
getConnection (): Promise<Connection | null> {
165181
return this._connectionPromise
166182
}

packages/core/src/session.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,17 @@ class Session {
115115
bookmarks,
116116
connectionProvider,
117117
impersonatedUser,
118-
onDatabaseNameResolved: this._onDatabaseNameResolved
118+
onDatabaseNameResolved: this._onDatabaseNameResolved,
119+
bookmarkManager
119120
})
120121
this._writeConnectionHolder = new ConnectionHolder({
121122
mode: ACCESS_MODE_WRITE,
122123
database,
123124
bookmarks,
124125
connectionProvider,
125126
impersonatedUser,
126-
onDatabaseNameResolved: this._onDatabaseNameResolved
127+
onDatabaseNameResolved: this._onDatabaseNameResolved,
128+
bookmarkManager
127129
})
128130
this._open = true
129131
this._hasTx = false
@@ -162,9 +164,8 @@ class Session {
162164
? new TxConfig(transactionConfig)
163165
: TxConfig.empty()
164166

165-
const bookmarks = this._bookmarks()
166-
167167
const result = this._run(validatedQuery, params, connection => {
168+
const bookmarks = this._bookmarks()
168169
this._assertSessionIsOpen()
169170
return (connection as Connection).protocol().run(validatedQuery, params, {
170171
bookmarks,
@@ -283,20 +284,19 @@ class Session {
283284
const connectionHolder = this._connectionHolderWithMode(mode)
284285
connectionHolder.initializeConnection()
285286
this._hasTx = true
286-
const bookmarks = this._bookmarks()
287287

288288
const tx = new TransactionPromise({
289289
connectionHolder,
290290
impersonatedUser: this._impersonatedUser,
291291
onClose: this._transactionClosed.bind(this),
292-
onBookmarks: (newBookmarks) => this._updateBookmarks(newBookmarks, bookmarks),
292+
onBookmarks: (newBm, oldBm, db) => this._updateBookmarks(newBm, oldBm, db),
293293
onConnection: this._assertSessionIsOpen.bind(this),
294294
reactive: this._reactive,
295295
fetchSize: this._fetchSize,
296296
lowRecordWatermark: this._lowRecordWatermark,
297297
highRecordWatermark: this._highRecordWatermark
298298
})
299-
tx._begin(bookmarks, txConfig)
299+
tx._begin(() => this._bookmarks(), txConfig)
300300
return tx
301301
}
302302

packages/core/src/transaction-promise.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class TransactionPromise extends Transaction implements Promise<Transaction> {
6969
}: {
7070
connectionHolder: ConnectionHolder
7171
onClose: () => void
72-
onBookmarks: (bookmarks: Bookmarks) => void
72+
onBookmarks: (newBookmarks: Bookmarks, previousBookmarks: Bookmarks, database?: string) => void
7373
onConnection: () => void
7474
reactive: boolean
7575
fetchSize: number
@@ -162,7 +162,7 @@ class TransactionPromise extends Transaction implements Promise<Transaction> {
162162
/**
163163
* @access private
164164
*/
165-
_begin (bookmarks: string | Bookmarks | string[], txConfig: TxConfig): void {
165+
_begin (bookmarks: () => Bookmarks, txConfig: TxConfig): void {
166166
return super._begin(bookmarks, txConfig, {
167167
onError: this._onBeginError.bind(this),
168168
onComplete: this._onBeginMetadata.bind(this)

0 commit comments

Comments
 (0)