Skip to content

Commit 05355c2

Browse files
committed
Pool: separate validation on acquire and on release and add the acquisition context to create and validate
1 parent 4869203 commit 05355c2

File tree

7 files changed

+200
-183
lines changed

7 files changed

+200
-183
lines changed

packages/bolt-connection/src/connection-provider/connection-provider-direct.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export default class DirectConnectionProvider extends PooledConnectionProvider {
4949
this._handleAuthorizationExpired(error, address, conn, database)
5050
})
5151

52-
const connection = await this._connectionPool.acquire(this._address)
52+
const connection = await this._connectionPool.acquire({}, this._address)
5353

5454
if (auth && auth !== connection.authToken) {
5555
if (connection.supportsReAuth) {

packages/bolt-connection/src/connection-provider/connection-provider-pooled.js

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ export default class PooledConnectionProvider extends ConnectionProvider {
4747
this._connectionPool = new Pool({
4848
create: this._createConnection.bind(this),
4949
destroy: this._destroyConnection.bind(this),
50-
validate: this._validateConnection.bind(this),
50+
validateOnAcquire: this._validateConnectionOnAcquire.bind(this),
51+
validateOnRelease: this._validateConnectionOnRelease.bind(this),
5152
installIdleObserver: PooledConnectionProvider._installIdleObserverOnConnection.bind(
5253
this
5354
),
@@ -69,7 +70,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
6970
* @return {Promise<Connection>} promise resolved with a new connection or rejected when failed to connect.
7071
* @access private
7172
*/
72-
_createConnection (address, release) {
73+
_createConnection (_context, address, release) {
7374
return this._createChannelConnection(address).then(connection => {
7475
connection._release = () => {
7576
return release(address, connection)
@@ -85,12 +86,32 @@ export default class PooledConnectionProvider extends ConnectionProvider {
8586
})
8687
}
8788

89+
async _validateConnectionOnAcquire (_context, conn) {
90+
if (!this._validateConnection(conn)) {
91+
return false
92+
}
93+
94+
try {
95+
await this._authenticationProvider.authenticate({ connection: conn })
96+
return true
97+
} catch (error) {
98+
this._log.info(
99+
`The connection ${conn.id} is not valid because of an error ${error.code} '${error.message}'`
100+
)
101+
return false
102+
}
103+
}
104+
105+
_validateConnectionOnRelease (conn) {
106+
return this._validateConnection(conn)
107+
}
108+
88109
/**
89110
* Check that a connection is usable
90111
* @return {boolean} true if the connection is open
91112
* @access private
92113
**/
93-
async _validateConnection (conn) {
114+
_validateConnection (conn) {
94115
if (!conn.isOpen()) {
95116
return false
96117
}
@@ -101,15 +122,6 @@ export default class PooledConnectionProvider extends ConnectionProvider {
101122
return false
102123
}
103124

104-
try {
105-
await this._authenticationProvider.authenticate({ connection: conn })
106-
} catch (error) {
107-
this._log.info(
108-
`The connection ${conn.id} is not valid because of an error ${error.code} '${error.message}'`
109-
)
110-
return false
111-
}
112-
113125
return true
114126
}
115127

@@ -130,7 +142,7 @@ export default class PooledConnectionProvider extends ConnectionProvider {
130142
* @return {Promise<ServerInfo>} the server info
131143
*/
132144
async _verifyConnectivityAndGetServerVersion ({ address }) {
133-
const connection = await this._connectionPool.acquire(address)
145+
const connection = await this._connectionPool.acquire({}, address)
134146
const serverInfo = new ServerInfo(connection.server, connection.protocol().version)
135147
try {
136148
if (!connection.protocol().isLastMessageLogin()) {

packages/bolt-connection/src/connection-provider/connection-provider-routing.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
317317
}
318318

319319
_acquireConnectionToServer (address, serverName, routingTable) {
320-
return this._connectionPool.acquire(address)
320+
return this._connectionPool.acquire({}, address)
321321
}
322322

323323
_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, auth } = {}) {
@@ -559,7 +559,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
559559

560560
async _createSessionForRediscovery (routerAddress, bookmarks, impersonatedUser, auth) {
561561
try {
562-
let connection = await this._connectionPool.acquire(routerAddress)
562+
let connection = await this._connectionPool.acquire({}, routerAddress)
563563

564564
if (auth && connection.authToken !== auth) {
565565
if (connection.supportsReAuth) {
@@ -767,7 +767,6 @@ function _isFailFastError (error) {
767767
}
768768

769769
function _isFailFastSecurityError (error) {
770-
console.error(error)
771770
return error.code.startsWith('Neo.ClientError.Security.') &&
772771
![
773772
AUTHORIZATION_EXPIRED_CODE

packages/bolt-connection/src/pool/pool.js

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@ const {
2626

2727
class Pool {
2828
/**
29-
* @param {function(address: ServerAddress, function(address: ServerAddress, resource: object): Promise<object>): Promise<object>} create
29+
* @param {function(acquisitionContext: object, address: ServerAddress, function(address: ServerAddress, resource: object): Promise<object>): Promise<object>} create
3030
* an allocation function that creates a promise with a new resource. It's given an address for which to
3131
* allocate the connection and a function that will return the resource to the pool if invoked, which is
3232
* meant to be called on .dispose or .close or whatever mechanism the resource uses to finalize.
33+
* @param {function(acquisitionContext: object, resource: object): boolean} validateOnAcquire
34+
* called at various times when an instance is acquired
35+
* If this returns false, the resource will be evicted
36+
* @param {function(resource: object): boolean} validateOnRelease
37+
* called at various times when an instance is released
38+
* If this returns false, the resource will be evicted
3339
* @param {function(resource: object): Promise<void>} destroy
3440
* called with the resource when it is evicted from this pool
35-
* @param {function(resource: object): boolean} validate
36-
* called at various times (like when an instance is acquired and when it is returned.
37-
* If this returns false, the resource will be evicted
3841
* @param {function(resource: object, observer: { onError }): void} installIdleObserver
3942
* called when the resource is released back to pool
4043
* @param {function(resource: object): void} removeIdleObserver
@@ -43,17 +46,19 @@ class Pool {
4346
* @param {Logger} log the driver logger.
4447
*/
4548
constructor ({
46-
create = (address, release) => Promise.resolve(),
49+
create = (acquisitionContext, address, release) => Promise.resolve(),
4750
destroy = conn => Promise.resolve(),
48-
validate = conn => true,
51+
validateOnAcquire = (acquisitionContext, conn) => true,
52+
validateOnRelease = (conn) => true,
4953
installIdleObserver = (conn, observer) => {},
5054
removeIdleObserver = conn => {},
5155
config = PoolConfig.defaultConfig(),
5256
log = Logger.noOp()
5357
} = {}) {
5458
this._create = create
5559
this._destroy = destroy
56-
this._validate = validate
60+
this._validateOnAcquire = validateOnAcquire
61+
this._validateOnRelease = validateOnRelease
5762
this._installIdleObserver = installIdleObserver
5863
this._removeIdleObserver = removeIdleObserver
5964
this._maxSize = config.maxSize
@@ -69,10 +74,11 @@ class Pool {
6974

7075
/**
7176
* Acquire and idle resource fom the pool or create a new one.
77+
* @param {object} acquisitionContext the acquisition context used for create and validateOnAcquire connection
7278
* @param {ServerAddress} address the address for which we're acquiring.
7379
* @return {Promise<Object>} resource that is ready to use.
7480
*/
75-
acquire (address) {
81+
acquire (acquisitionContext, address) {
7682
const key = address.asKey()
7783

7884
// We're out of resources and will try to acquire later on when an existing resource is released.
@@ -108,7 +114,7 @@ class Pool {
108114
}
109115
}, this._acquisitionTimeout)
110116

111-
request = new PendingRequest(key, resolve, reject, timeoutId, this._log)
117+
request = new PendingRequest(key, acquisitionContext, resolve, reject, timeoutId, this._log)
112118
allRequests[key].push(request)
113119
this._processPendingAcquireRequests(address)
114120
})
@@ -193,7 +199,7 @@ class Pool {
193199
return pool
194200
}
195201

196-
async _acquire (address) {
202+
async _acquire (acquisitionContext, address) {
197203
if (this._closed) {
198204
throw newError('Pool is closed, it is no more able to serve requests.')
199205
}
@@ -207,7 +213,7 @@ class Pool {
207213
this._removeIdleObserver(resource)
208214
}
209215

210-
if (await this._validate(resource)) {
216+
if (await this._validateOnAcquire(acquisitionContext, resource)) {
211217
// idle resource is valid and can be acquired
212218
resourceAcquired(key, this._activeResourceCounts)
213219
if (this._log.isDebugEnabled()) {
@@ -238,7 +244,7 @@ class Pool {
238244
let resource
239245
try {
240246
// Invoke callback that creates actual connection
241-
resource = await this._create(address, (address, resource) => this._release(address, resource, pool))
247+
resource = await this._create(acquisitionContext, address, (address, resource) => this._release(address, resource, pool))
242248

243249
pool.pushInUse(resource)
244250
resourceAcquired(key, this._activeResourceCounts)
@@ -256,7 +262,7 @@ class Pool {
256262

257263
if (pool.isActive()) {
258264
// there exist idle connections for the given key
259-
if (!await this._validate(resource)) {
265+
if (!await this._validateOnRelease(resource)) {
260266
if (this._log.isDebugEnabled()) {
261267
this._log.debug(
262268
`${resource} destroyed and can't be released to the pool ${key} because it is not functional`
@@ -327,7 +333,7 @@ class Pool {
327333
const pendingRequest = requests.shift() // pop a pending acquire request
328334

329335
if (pendingRequest) {
330-
this._acquire(address)
336+
this._acquire(pendingRequest.context, address)
331337
.catch(error => {
332338
// failed to acquire/create a new connection to resolve the pending acquire request
333339
// propagate the error by failing the pending request
@@ -391,15 +397,20 @@ function resourceReleased (key, activeResourceCounts) {
391397
}
392398

393399
class PendingRequest {
394-
constructor (key, resolve, reject, timeoutId, log) {
400+
constructor (key, context, resolve, reject, timeoutId, log) {
395401
this._key = key
402+
this._context = context
396403
this._resolve = resolve
397404
this._reject = reject
398405
this._timeoutId = timeoutId
399406
this._log = log
400407
this._completed = false
401408
}
402409

410+
get context () {
411+
return this._context
412+
}
413+
403414
isCompleted () {
404415
return this._completed
405416
}

packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ function newPool ({ create, config } = {}) {
333333
}
334334
return new Pool({
335335
config,
336-
create: (address, release) =>
336+
create: (_, address, release) =>
337337
Promise.resolve(_create(address, release))
338338
})
339339
}

packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ describe.each([
129129
it('purges connections when address is forgotten', () => {
130130
const pool = newPool()
131131

132-
pool.acquire(server1)
133-
pool.acquire(server3)
134-
pool.acquire(server5)
132+
pool.acquire({}, server1)
133+
pool.acquire({}, server3)
134+
pool.acquire({}, server5)
135135
expectPoolToContain(pool, [server1, server3, server5])
136136

137137
const connectionProvider = newRoutingConnectionProvider(
@@ -2589,7 +2589,7 @@ describe.each([
25892589

25902590
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
25912591
const address = targetServers[0]
2592-
expect(acquireSpy).toHaveBeenCalledWith(address)
2592+
expect(acquireSpy).toHaveBeenCalledWith({}, address)
25932593

25942594
const connections = seenConnectionsPerAddress.get(address)
25952595

@@ -2608,7 +2608,7 @@ describe.each([
26082608

26092609
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
26102610
const address = targetServers[0]
2611-
expect(acquireSpy).toHaveBeenCalledWith(address)
2611+
expect(acquireSpy).toHaveBeenCalledWith({}, address)
26122612

26132613
const connections = seenConnectionsPerAddress.get(address)
26142614

@@ -2628,7 +2628,7 @@ describe.each([
26282628

26292629
const targetServers = accessMode === WRITE ? routingTable.readers : routingTable.writers
26302630
for (const address of targetServers) {
2631-
expect(acquireSpy).not.toHaveBeenCalledWith(address)
2631+
expect(acquireSpy).not.toHaveBeenCalledWith({}, address)
26322632
expect(seenConnectionsPerAddress.get(address)).toBeUndefined()
26332633
}
26342634
})
@@ -2711,7 +2711,7 @@ describe.each([
27112711
} finally {
27122712
const targetServers = accessMode === WRITE ? routingTable.writers : routingTable.readers
27132713
for (const address of targetServers) {
2714-
expect(acquireSpy).toHaveBeenCalledWith(address)
2714+
expect(acquireSpy).toHaveBeenCalledWith({}, address)
27152715

27162716
const connections = seenConnectionsPerAddress.get(address)
27172717

@@ -2947,7 +2947,7 @@ describe.each([
29472947
}
29482948
return new Pool({
29492949
config,
2950-
create: (address, release) => _create(address, release)
2950+
create: (_, address, release) => _create(address, release)
29512951
})
29522952
}
29532953

0 commit comments

Comments
 (0)