@@ -62,7 +62,6 @@ class Pool {
62
62
this . _pendingCreates = { }
63
63
this . _acquireRequests = { }
64
64
this . _activeResourceCounts = { }
65
- this . _poolState = { }
66
65
this . _release = this . _release . bind ( this )
67
66
this . _log = log
68
67
this . _closed = false
@@ -111,7 +110,6 @@ class Pool {
111
110
112
111
request = new PendingRequest ( key , resolve , reject , timeoutId , this . _log )
113
112
allRequests [ key ] . push ( request )
114
-
115
113
this . _processPendingAcquireRequests ( address )
116
114
} )
117
115
}
@@ -177,21 +175,23 @@ class Pool {
177
175
return this . _activeResourceCounts [ address . asKey ( ) ] || 0
178
176
}
179
177
178
+ _getOrInitializePoolFor ( key ) {
179
+ let pool = this . _pools [ key ]
180
+ if ( ! pool ) {
181
+ pool = new SingleAddressPool ( )
182
+ this . _pools [ key ] = pool
183
+ this . _pendingCreates [ key ] = 0
184
+ }
185
+ return pool
186
+ }
187
+
180
188
async _acquire ( address ) {
181
189
if ( this . _closed ) {
182
190
throw newError ( 'Pool is closed, it is no more able to serve requests.' )
183
191
}
184
192
185
193
const key = address . asKey ( )
186
- let pool = this . _pools [ key ]
187
- let poolState = this . _poolState [ key ]
188
- if ( ! pool ) {
189
- pool = [ ]
190
- poolState = new PoolState ( )
191
- this . _pools [ key ] = pool
192
- this . _pendingCreates [ key ] = 0
193
- this . _poolState [ key ] = poolState
194
- }
194
+ const pool = this . _getOrInitializePoolFor ( key )
195
195
while ( pool . length ) {
196
196
const resource = pool . pop ( )
197
197
@@ -205,7 +205,7 @@ class Pool {
205
205
if ( this . _log . isDebugEnabled ( ) ) {
206
206
this . _log . debug ( `${ resource } acquired from the pool ${ key } ` )
207
207
}
208
- return resource
208
+ return { resource, pool }
209
209
} else {
210
210
await this . _destroy ( resource )
211
211
}
@@ -219,7 +219,7 @@ class Pool {
219
219
this . activeResourceCount ( address ) + this . _pendingCreates [ key ]
220
220
if ( numConnections >= this . _maxSize ) {
221
221
// Will put this request in queue instead since the pool is full
222
- return null
222
+ return { resource : null , pool }
223
223
}
224
224
}
225
225
@@ -229,7 +229,7 @@ class Pool {
229
229
let resource
230
230
try {
231
231
// Invoke callback that creates actual connection
232
- resource = await this . _create ( address , ( address , resource ) => this . _release ( poolState , address , resource ) )
232
+ resource = await this . _create ( address , ( address , resource ) => this . _release ( address , resource , pool ) )
233
233
234
234
resourceAcquired ( key , this . _activeResourceCounts )
235
235
if ( this . _log . isDebugEnabled ( ) ) {
@@ -238,14 +238,13 @@ class Pool {
238
238
} finally {
239
239
this . _pendingCreates [ key ] = this . _pendingCreates [ key ] - 1
240
240
}
241
- return resource
241
+ return { resource, pool }
242
242
}
243
243
244
- async _release ( poolState , address , resource ) {
244
+ async _release ( address , resource , pool ) {
245
245
const key = address . asKey ( )
246
- const pool = this . _pools [ key ]
247
246
248
- if ( pool && poolState . isActive ( ) ) {
247
+ if ( pool . isActive ( ) ) {
249
248
// there exist idle connections for the given key
250
249
if ( ! this . _validate ( resource ) ) {
251
250
if ( this . _log . isDebugEnabled ( ) ) {
@@ -292,24 +291,23 @@ class Pool {
292
291
}
293
292
294
293
async _purgeKey ( key ) {
295
- const pool = this . _pools [ key ] || [ ]
296
- const poolState = this . _poolState [ key ] || new PoolState ( )
297
- while ( pool . length ) {
298
- const resource = pool . pop ( )
299
- if ( this . _removeIdleObserver ) {
300
- this . _removeIdleObserver ( resource )
294
+ const pool = this . _pools [ key ]
295
+ if ( pool ) {
296
+ while ( pool . length ) {
297
+ const resource = pool . pop ( )
298
+ if ( this . _removeIdleObserver ) {
299
+ this . _removeIdleObserver ( resource )
300
+ }
301
+ await this . _destroy ( resource )
301
302
}
302
- await this . _destroy ( resource )
303
+ pool . close ( )
304
+ delete this . _pools [ key ]
303
305
}
304
- poolState . close ( )
305
- delete this . _pools [ key ]
306
- delete this . _poolState [ key ]
307
306
}
308
307
309
308
_processPendingAcquireRequests ( address ) {
310
309
const key = address . asKey ( )
311
310
const requests = this . _acquireRequests [ key ]
312
- const poolState = this . _poolState [ key ] || new PoolState ( )
313
311
if ( requests ) {
314
312
const pendingRequest = requests . shift ( ) // pop a pending acquire request
315
313
@@ -319,16 +317,16 @@ class Pool {
319
317
// failed to acquire/create a new connection to resolve the pending acquire request
320
318
// propagate the error by failing the pending request
321
319
pendingRequest . reject ( error )
322
- return null
320
+ return { resource : null }
323
321
} )
324
- . then ( resource => {
322
+ . then ( ( { resource, pool } ) => {
325
323
if ( resource ) {
326
324
// managed to acquire a valid resource from the pool
327
325
328
326
if ( pendingRequest . isCompleted ( ) ) {
329
327
// request has been completed, most likely failed by a timeout
330
328
// return the acquired resource back to the pool
331
- this . _release ( poolState , address , resource )
329
+ this . _release ( address , resource , pool )
332
330
} else {
333
331
// request is still pending and can be resolved with the newly acquired resource
334
332
pendingRequest . resolve ( resource ) // resolve the pending request with the acquired resource
@@ -415,9 +413,10 @@ class PendingRequest {
415
413
}
416
414
}
417
415
418
- class PoolState {
416
+ class SingleAddressPool {
419
417
constructor ( ) {
420
418
this . _active = true
419
+ this . _elements = [ ]
421
420
}
422
421
423
422
isActive ( ) {
@@ -427,6 +426,23 @@ class PoolState {
427
426
close ( ) {
428
427
this . _active = false
429
428
}
429
+
430
+ filter ( predicate ) {
431
+ this . _elements = this . _elements . filter ( predicate )
432
+ return this
433
+ }
434
+
435
+ get length ( ) {
436
+ return this . _elements . length
437
+ }
438
+
439
+ pop ( ) {
440
+ return this . _elements . pop ( )
441
+ }
442
+
443
+ push ( element ) {
444
+ return this . _elements . push ( element )
445
+ }
430
446
}
431
447
432
448
export default Pool
0 commit comments