From 1d685a3211440cc8fdc8598c789d4c79a674177f Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 26 Apr 2022 15:05:57 +0200 Subject: [PATCH 1/3] Refactory `Pool` module The state of the pool and the resource list weren't bounded togheter as it supposed to. This scenario makes the code prune to bugs and incosistency in the long run. Merge the resouse list (a.k.a. Pool) and PoolState in the same object makes the code easier to understand and more coesive. --- packages/bolt-connection/src/pool/pool.js | 60 ++++++++++++++--------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index dca648bc1..9731e2d48 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -62,7 +62,6 @@ class Pool { this._pendingCreates = {} this._acquireRequests = {} this._activeResourceCounts = {} - this._poolState = {} this._release = this._release.bind(this) this._log = log this._closed = false @@ -184,13 +183,10 @@ class Pool { const key = address.asKey() let pool = this._pools[key] - let poolState = this._poolState[key] if (!pool) { - pool = [] - poolState = new PoolState() + pool = new SingleAddressPool() this._pools[key] = pool this._pendingCreates[key] = 0 - this._poolState[key] = poolState } while (pool.length) { const resource = pool.pop() @@ -229,7 +225,7 @@ class Pool { let resource try { // Invoke callback that creates actual connection - resource = await this._create(address, (address, resource) => this._release(poolState, address, resource)) + resource = await this._create(address, (address, resource) => this._release(address, resource, pool)) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { @@ -241,11 +237,10 @@ class Pool { return resource } - async _release (poolState, address, resource) { + async _release (address, resource, pool) { const key = address.asKey() - const pool = this._pools[key] - if (pool && poolState.isActive()) { + if (pool.isActive()) { // there exist idle connections for the given key if (!this._validate(resource)) { if (this._log.isDebugEnabled()) { @@ -288,28 +283,27 @@ class Pool { } resourceReleased(key, this._activeResourceCounts) - this._processPendingAcquireRequests(address) + this._processPendingAcquireRequests(address, pool) } async _purgeKey (key) { - const pool = this._pools[key] || [] - const poolState = this._poolState[key] || new PoolState() - while (pool.length) { - const resource = pool.pop() - if (this._removeIdleObserver) { - this._removeIdleObserver(resource) + const pool = this._pools[key] + if (pool) { + while (pool.length) { + const resource = pool.pop() + if (this._removeIdleObserver) { + this._removeIdleObserver(resource) + } + await this._destroy(resource) } - await this._destroy(resource) + pool.close() + delete this._pools[key] } - poolState.close() - delete this._pools[key] - delete this._poolState[key] } - _processPendingAcquireRequests (address) { + _processPendingAcquireRequests (address, pool) { const key = address.asKey() const requests = this._acquireRequests[key] - const poolState = this._poolState[key] || new PoolState() if (requests) { const pendingRequest = requests.shift() // pop a pending acquire request @@ -328,7 +322,7 @@ class Pool { if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool - this._release(poolState, address, resource) + this._release(address, resource, pool) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -415,9 +409,10 @@ class PendingRequest { } } -class PoolState { +class SingleAddressPool { constructor () { this._active = true + this._elements = [] } isActive () { @@ -427,6 +422,23 @@ class PoolState { close () { this._active = false } + + filter (predicate) { + this._elements = this._elements.filter(predicate) + return this + } + + get length () { + return this._elements.length + } + + pop () { + return this._elements.pop() + } + + push (element) { + return this._elements.push(element) + } } export default Pool From e3465fbff932abe9873007bc06a0e725e13992d4 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 4 May 2022 10:27:00 +0200 Subject: [PATCH 2/3] Initializing pool correctly --- packages/bolt-connection/src/pool/pool.js | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index 9731e2d48..1b6a58f08 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -110,8 +110,8 @@ class Pool { request = new PendingRequest(key, resolve, reject, timeoutId, this._log) allRequests[key].push(request) - - this._processPendingAcquireRequests(address) + const pool = this._getOrInitializePoolFor(key) + this._processPendingAcquireRequests(address, pool) }) } @@ -176,18 +176,23 @@ class Pool { return this._activeResourceCounts[address.asKey()] || 0 } - async _acquire (address) { - if (this._closed) { - throw newError('Pool is closed, it is no more able to serve requests.') - } - - const key = address.asKey() + _getOrInitializePoolFor (key) { let pool = this._pools[key] if (!pool) { pool = new SingleAddressPool() this._pools[key] = pool this._pendingCreates[key] = 0 } + return pool + } + + async _acquire (address) { + if (this._closed) { + throw newError('Pool is closed, it is no more able to serve requests.') + } + + const key = address.asKey() + const pool = this._getOrInitializePoolFor(key) while (pool.length) { const resource = pool.pop() From 4a47a72e8d151e158c971d9d3bbc0a1f4df8a774 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 4 May 2022 19:58:15 +0200 Subject: [PATCH 3/3] Make the pool create and fetch logic restricted to the _acquire method --- packages/bolt-connection/src/pool/pool.js | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index 1b6a58f08..60d0f104f 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -110,8 +110,7 @@ class Pool { request = new PendingRequest(key, resolve, reject, timeoutId, this._log) allRequests[key].push(request) - const pool = this._getOrInitializePoolFor(key) - this._processPendingAcquireRequests(address, pool) + this._processPendingAcquireRequests(address) }) } @@ -206,7 +205,7 @@ class Pool { if (this._log.isDebugEnabled()) { this._log.debug(`${resource} acquired from the pool ${key}`) } - return resource + return { resource, pool } } else { await this._destroy(resource) } @@ -220,7 +219,7 @@ class Pool { this.activeResourceCount(address) + this._pendingCreates[key] if (numConnections >= this._maxSize) { // Will put this request in queue instead since the pool is full - return null + return { resource: null, pool } } } @@ -239,7 +238,7 @@ class Pool { } finally { this._pendingCreates[key] = this._pendingCreates[key] - 1 } - return resource + return { resource, pool } } async _release (address, resource, pool) { @@ -288,7 +287,7 @@ class Pool { } resourceReleased(key, this._activeResourceCounts) - this._processPendingAcquireRequests(address, pool) + this._processPendingAcquireRequests(address) } async _purgeKey (key) { @@ -306,7 +305,7 @@ class Pool { } } - _processPendingAcquireRequests (address, pool) { + _processPendingAcquireRequests (address) { const key = address.asKey() const requests = this._acquireRequests[key] if (requests) { @@ -318,9 +317,9 @@ class Pool { // failed to acquire/create a new connection to resolve the pending acquire request // propagate the error by failing the pending request pendingRequest.reject(error) - return null + return { resource: null } }) - .then(resource => { + .then(({ resource, pool }) => { if (resource) { // managed to acquire a valid resource from the pool