diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index dca648bc1..60d0f104f 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -62,7 +62,6 @@ class Pool { this._pendingCreates = {} this._acquireRequests = {} this._activeResourceCounts = {} - this._poolState = {} this._release = this._release.bind(this) this._log = log this._closed = false @@ -111,7 +110,6 @@ class Pool { request = new PendingRequest(key, resolve, reject, timeoutId, this._log) allRequests[key].push(request) - this._processPendingAcquireRequests(address) }) } @@ -177,21 +175,23 @@ class Pool { return this._activeResourceCounts[address.asKey()] || 0 } + _getOrInitializePoolFor (key) { + let pool = this._pools[key] + if (!pool) { + pool = new SingleAddressPool() + this._pools[key] = pool + this._pendingCreates[key] = 0 + } + return pool + } + async _acquire (address) { if (this._closed) { throw newError('Pool is closed, it is no more able to serve requests.') } const key = address.asKey() - let pool = this._pools[key] - let poolState = this._poolState[key] - if (!pool) { - pool = [] - poolState = new PoolState() - this._pools[key] = pool - this._pendingCreates[key] = 0 - this._poolState[key] = poolState - } + const pool = this._getOrInitializePoolFor(key) while (pool.length) { const resource = pool.pop() @@ -205,7 +205,7 @@ class Pool { if (this._log.isDebugEnabled()) { this._log.debug(`${resource} acquired from the pool ${key}`) } - return resource + return { resource, pool } } else { await this._destroy(resource) } @@ -219,7 +219,7 @@ class Pool { this.activeResourceCount(address) + this._pendingCreates[key] if (numConnections >= this._maxSize) { // Will put this request in queue instead since the pool is full - return null + return { resource: null, pool } } } @@ -229,7 +229,7 @@ class Pool { let resource try { // Invoke callback that creates actual connection - resource = await this._create(address, (address, resource) => this._release(poolState, address, resource)) + resource = await this._create(address, (address, resource) => this._release(address, resource, pool)) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { @@ -238,14 +238,13 @@ class Pool { } finally { this._pendingCreates[key] = this._pendingCreates[key] - 1 } - return resource + return { resource, pool } } - async _release (poolState, address, resource) { + async _release (address, resource, pool) { const key = address.asKey() - const pool = this._pools[key] - if (pool && poolState.isActive()) { + if (pool.isActive()) { // there exist idle connections for the given key if (!this._validate(resource)) { if (this._log.isDebugEnabled()) { @@ -292,24 +291,23 @@ class Pool { } async _purgeKey (key) { - const pool = this._pools[key] || [] - const poolState = this._poolState[key] || new PoolState() - while (pool.length) { - const resource = pool.pop() - if (this._removeIdleObserver) { - this._removeIdleObserver(resource) + const pool = this._pools[key] + if (pool) { + while (pool.length) { + const resource = pool.pop() + if (this._removeIdleObserver) { + this._removeIdleObserver(resource) + } + await this._destroy(resource) } - await this._destroy(resource) + pool.close() + delete this._pools[key] } - poolState.close() - delete this._pools[key] - delete this._poolState[key] } _processPendingAcquireRequests (address) { const key = address.asKey() const requests = this._acquireRequests[key] - const poolState = this._poolState[key] || new PoolState() if (requests) { const pendingRequest = requests.shift() // pop a pending acquire request @@ -319,16 +317,16 @@ class Pool { // failed to acquire/create a new connection to resolve the pending acquire request // propagate the error by failing the pending request pendingRequest.reject(error) - return null + return { resource: null } }) - .then(resource => { + .then(({ resource, pool }) => { if (resource) { // managed to acquire a valid resource from the pool if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool - this._release(poolState, address, resource) + this._release(address, resource, pool) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -415,9 +413,10 @@ class PendingRequest { } } -class PoolState { +class SingleAddressPool { constructor () { this._active = true + this._elements = [] } isActive () { @@ -427,6 +426,23 @@ class PoolState { close () { this._active = false } + + filter (predicate) { + this._elements = this._elements.filter(predicate) + return this + } + + get length () { + return this._elements.length + } + + pop () { + return this._elements.pop() + } + + push (element) { + return this._elements.push(element) + } } export default Pool