Skip to content

Commit db15b1e

Browse files
committed
Fix connections not being destroyed when released to purged pool
This scenario happens when the pull for a given address is purged while a connection stills in use and then another pool for the same address is created. In this case, the connection was being wrongly added to the existing pool. The correct behaviour is destroy this orphan connections.
1 parent 0960559 commit db15b1e

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

packages/bolt-connection/src/pool/pool.js

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class Pool {
6262
this._pendingCreates = {}
6363
this._acquireRequests = {}
6464
this._activeResourceCounts = {}
65+
this._poolState = {}
6566
this._release = this._release.bind(this)
6667
this._log = log
6768
this._closed = false
@@ -189,10 +190,13 @@ class Pool {
189190

190191
const key = address.asKey()
191192
let pool = this._pools[key]
193+
let poolState = this._poolState[key]
192194
if (!pool) {
193195
pool = []
196+
poolState = new PoolState()
194197
this._pools[key] = pool
195198
this._pendingCreates[key] = 0
199+
this._poolState[key] = poolState
196200
}
197201
while (pool.length) {
198202
const resource = pool.pop()
@@ -231,7 +235,7 @@ class Pool {
231235
let resource
232236
try {
233237
// Invoke callback that creates actual connection
234-
resource = await this._create(address, this._release)
238+
resource = await this._create(address, (address, resource) => this._release(poolState, address, resource))
235239

236240
resourceAcquired(key, this._activeResourceCounts)
237241
if (this._log.isDebugEnabled()) {
@@ -243,11 +247,11 @@ class Pool {
243247
return resource
244248
}
245249

246-
async _release (address, resource) {
250+
async _release (poolState, address, resource) {
247251
const key = address.asKey()
248252
const pool = this._pools[key]
249253

250-
if (pool) {
254+
if (pool && poolState.isActive()) {
251255
// there exist idle connections for the given key
252256
if (!this._validate(resource)) {
253257
if (this._log.isDebugEnabled()) {
@@ -295,19 +299,23 @@ class Pool {
295299

296300
async _purgeKey (key) {
297301
const pool = this._pools[key] || []
302+
const poolState = this._poolState[key] || new PoolState()
298303
while (pool.length) {
299304
const resource = pool.pop()
300305
if (this._removeIdleObserver) {
301306
this._removeIdleObserver(resource)
302307
}
303308
await this._destroy(resource)
304309
}
310+
poolState.close()
305311
delete this._pools[key]
312+
delete this._poolState[key]
306313
}
307314

308315
_processPendingAcquireRequests (address) {
309316
const key = address.asKey()
310317
const requests = this._acquireRequests[key]
318+
const poolState = this._poolState[key]
311319
if (requests) {
312320
const pendingRequest = requests.shift() // pop a pending acquire request
313321

@@ -326,7 +334,7 @@ class Pool {
326334
if (pendingRequest.isCompleted()) {
327335
// request has been completed, most likely failed by a timeout
328336
// return the acquired resource back to the pool
329-
this._release(address, resource)
337+
this._release(poolState, address, resource)
330338
} else {
331339
// request is still pending and can be resolved with the newly acquired resource
332340
pendingRequest.resolve(resource) // resolve the pending request with the acquired resource
@@ -404,4 +412,18 @@ class PendingRequest {
404412
}
405413
}
406414

415+
class PoolState {
416+
constructor() {
417+
this._active = true;
418+
}
419+
420+
isActive() {
421+
return this._active;
422+
}
423+
424+
close() {
425+
this._active = false;
426+
}
427+
}
428+
407429
export default Pool

packages/bolt-connection/test/pool/pool.test.js

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,44 @@ describe('#unit Pool', () => {
237237
expect(r0.destroyed).toBeTruthy()
238238
})
239239

240+
it('destroys resource when key was purged event when key is acquired again', async () => {
241+
let counter = 0
242+
const address = ServerAddress.fromUrl('bolt://localhost:7687')
243+
const pool = new Pool({
244+
create: (server, release) =>
245+
Promise.resolve(new Resource(server, counter++, release)),
246+
destroy: res => {
247+
res.destroyed = true
248+
return Promise.resolve()
249+
}
250+
})
251+
252+
// Acquire resource
253+
const r0 = await pool.acquire(address)
254+
expect(pool.has(address)).toBeTruthy()
255+
expect(r0.id).toEqual(0)
256+
257+
// Purging the key
258+
await pool.purge(address)
259+
expect(pool.has(address)).toBeFalsy()
260+
expect(r0.destroyed).toBeFalsy()
261+
262+
// Acquiring second resolve should recreate the pool
263+
const r1 = await pool.acquire(address)
264+
expect(pool.has(address)).toBeTruthy()
265+
expect(r1.id).toEqual(1)
266+
267+
// Closing the first resource should destroy it
268+
await r0.close()
269+
expect(pool.has(address)).toBeTruthy()
270+
expect(r0.destroyed).toBeTruthy()
271+
272+
// Closing the second resource should not destroy it
273+
await r1.close()
274+
expect(pool.has(address)).toBeTruthy()
275+
expect(r1.destroyed).toBeFalsy()
276+
})
277+
240278
it('close purges all keys', async () => {
241279
let counter = 0
242280

0 commit comments

Comments
 (0)