Skip to content

Refactory Pool module #930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 9, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 49 additions & 33 deletions packages/bolt-connection/src/pool/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class Pool {
this._pendingCreates = {}
this._acquireRequests = {}
this._activeResourceCounts = {}
this._poolState = {}
this._release = this._release.bind(this)
this._log = log
this._closed = false
Expand Down Expand Up @@ -111,7 +110,6 @@ class Pool {

request = new PendingRequest(key, resolve, reject, timeoutId, this._log)
allRequests[key].push(request)

this._processPendingAcquireRequests(address)
})
}
Expand Down Expand Up @@ -177,21 +175,23 @@ class Pool {
return this._activeResourceCounts[address.asKey()] || 0
}

_getOrInitializePoolFor (key) {
let pool = this._pools[key]
if (!pool) {
pool = new SingleAddressPool()
this._pools[key] = pool
this._pendingCreates[key] = 0
}
return pool
}

async _acquire (address) {
if (this._closed) {
throw newError('Pool is closed, it is no more able to serve requests.')
}

const key = address.asKey()
let pool = this._pools[key]
let poolState = this._poolState[key]
if (!pool) {
pool = []
poolState = new PoolState()
this._pools[key] = pool
this._pendingCreates[key] = 0
this._poolState[key] = poolState
}
const pool = this._getOrInitializePoolFor(key)
while (pool.length) {
const resource = pool.pop()

Expand All @@ -205,7 +205,7 @@ class Pool {
if (this._log.isDebugEnabled()) {
this._log.debug(`${resource} acquired from the pool ${key}`)
}
return resource
return { resource, pool }
} else {
await this._destroy(resource)
}
Expand All @@ -219,7 +219,7 @@ class Pool {
this.activeResourceCount(address) + this._pendingCreates[key]
if (numConnections >= this._maxSize) {
// Will put this request in queue instead since the pool is full
return null
return { resource: null, pool }
}
}

Expand All @@ -229,7 +229,7 @@ class Pool {
let resource
try {
// Invoke callback that creates actual connection
resource = await this._create(address, (address, resource) => this._release(poolState, address, resource))
resource = await this._create(address, (address, resource) => this._release(address, resource, pool))

resourceAcquired(key, this._activeResourceCounts)
if (this._log.isDebugEnabled()) {
Expand All @@ -238,14 +238,13 @@ class Pool {
} finally {
this._pendingCreates[key] = this._pendingCreates[key] - 1
}
return resource
return { resource, pool }
}

async _release (poolState, address, resource) {
async _release (address, resource, pool) {
const key = address.asKey()
const pool = this._pools[key]

if (pool && poolState.isActive()) {
if (pool.isActive()) {
// there exist idle connections for the given key
if (!this._validate(resource)) {
if (this._log.isDebugEnabled()) {
Expand Down Expand Up @@ -292,24 +291,23 @@ class Pool {
}

async _purgeKey (key) {
const pool = this._pools[key] || []
const poolState = this._poolState[key] || new PoolState()
while (pool.length) {
const resource = pool.pop()
if (this._removeIdleObserver) {
this._removeIdleObserver(resource)
const pool = this._pools[key]
if (pool) {
while (pool.length) {
const resource = pool.pop()
if (this._removeIdleObserver) {
this._removeIdleObserver(resource)
}
await this._destroy(resource)
}
await this._destroy(resource)
pool.close()
delete this._pools[key]
}
poolState.close()
delete this._pools[key]
delete this._poolState[key]
}

_processPendingAcquireRequests (address) {
const key = address.asKey()
const requests = this._acquireRequests[key]
const poolState = this._poolState[key] || new PoolState()
if (requests) {
const pendingRequest = requests.shift() // pop a pending acquire request

Expand All @@ -319,16 +317,16 @@ class Pool {
// failed to acquire/create a new connection to resolve the pending acquire request
// propagate the error by failing the pending request
pendingRequest.reject(error)
return null
return { resource: null }
})
.then(resource => {
.then(({ resource, pool }) => {
if (resource) {
// managed to acquire a valid resource from the pool

if (pendingRequest.isCompleted()) {
// request has been completed, most likely failed by a timeout
// return the acquired resource back to the pool
this._release(poolState, address, resource)
this._release(address, resource, pool)
} else {
// request is still pending and can be resolved with the newly acquired resource
pendingRequest.resolve(resource) // resolve the pending request with the acquired resource
Expand Down Expand Up @@ -415,9 +413,10 @@ class PendingRequest {
}
}

class PoolState {
class SingleAddressPool {
constructor () {
this._active = true
this._elements = []
}

isActive () {
Expand All @@ -427,6 +426,23 @@ class PoolState {
close () {
this._active = false
}

filter (predicate) {
this._elements = this._elements.filter(predicate)
return this
}

get length () {
return this._elements.length
}

pop () {
return this._elements.pop()
}

push (element) {
return this._elements.push(element)
}
}

export default Pool