diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index dff3cd47a..3d4ce8a02 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -62,6 +62,7 @@ class Pool { this._pendingCreates = {} this._acquireRequests = {} this._activeResourceCounts = {} + this._poolState = {} this._release = this._release.bind(this) this._log = log this._closed = false @@ -189,10 +190,13 @@ class Pool { const key = address.asKey() let pool = this._pools[key] + let poolState = this._poolState[key] if (!pool) { pool = [] + poolState = new PoolState() this._pools[key] = pool this._pendingCreates[key] = 0 + this._poolState[key] = poolState } while (pool.length) { const resource = pool.pop() @@ -231,7 +235,7 @@ class Pool { let resource try { // Invoke callback that creates actual connection - resource = await this._create(address, this._release) + resource = await this._create(address, (address, resource) => this._release(poolState, address, resource)) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { @@ -243,11 +247,11 @@ class Pool { return resource } - async _release (address, resource) { + async _release (poolState, address, resource) { const key = address.asKey() const pool = this._pools[key] - if (pool) { + if (pool && poolState.isActive()) { // there exist idle connections for the given key if (!this._validate(resource)) { if (this._log.isDebugEnabled()) { @@ -295,6 +299,7 @@ class Pool { async _purgeKey (key) { const pool = this._pools[key] || [] + const poolState = this._poolState[key] || new PoolState() while (pool.length) { const resource = pool.pop() if (this._removeIdleObserver) { @@ -302,12 +307,15 @@ class Pool { } await this._destroy(resource) } + poolState.close() delete this._pools[key] + delete this._poolState[key] } _processPendingAcquireRequests (address) { const key = address.asKey() const requests = this._acquireRequests[key] + const poolState = this._poolState[key] if (requests) { const pendingRequest = requests.shift() // pop a pending acquire request @@ -326,7 +334,7 @@ class Pool { if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool - this._release(address, resource) + this._release(poolState, address, resource) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -404,4 +412,18 @@ class PendingRequest { } } +class PoolState { + constructor() { + this._active = true; + } + + isActive() { + return this._active; + } + + close() { + this._active = false; + } +} + export default Pool diff --git a/packages/neo4j-driver/test/internal/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js similarity index 93% rename from packages/neo4j-driver/test/internal/pool.test.js rename to packages/bolt-connection/test/pool/pool.test.js index ba538474a..d717ccbc4 100644 --- a/packages/neo4j-driver/test/internal/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -17,8 +17,8 @@ * limitations under the License. */ -import Pool from '../../../bolt-connection/lib/pool/pool' -import PoolConfig from '../../../bolt-connection/lib/pool/pool-config' +import Pool from '../../src/pool/pool' +import PoolConfig from '../../src/pool/pool-config' import { newError, error, internal } from 'neo4j-driver-core' const { @@ -27,7 +27,7 @@ const { const { SERVICE_UNAVAILABLE } = error -describe('#unit Pool', async () => { +describe('#unit Pool', () => { it('allocates if pool is empty', async () => { // Given let counter = 0 @@ -237,6 +237,44 @@ describe('#unit Pool', async () => { expect(r0.destroyed).toBeTruthy() }) + it('destroys resource when pool is purged even if a new pool is created for the same address', async () => { + let counter = 0 + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + res.destroyed = true + return Promise.resolve() + } + }) + + // Acquire resource + const r0 = await pool.acquire(address) + expect(pool.has(address)).toBeTruthy() + expect(r0.id).toEqual(0) + + // Purging the key + await pool.purge(address) + expect(pool.has(address)).toBeFalsy() + expect(r0.destroyed).toBeFalsy() + + // Acquiring second resource should recreate the pool + const r1 = await pool.acquire(address) + expect(pool.has(address)).toBeTruthy() + expect(r1.id).toEqual(1) + + // Closing the first resource should destroy it + await r0.close() + expect(pool.has(address)).toBeTruthy() + expect(r0.destroyed).toBeTruthy() + + // Closing the second resource should not destroy it + await r1.close() + expect(pool.has(address)).toBeTruthy() + expect(r1.destroyed).toBeFalsy() + }) + it('close purges all keys', async () => { let counter = 0 @@ -282,11 +320,9 @@ describe('#unit Pool', async () => { // Close the pool await pool.close() - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.objectContaining({ - message: jasmine.stringMatching(/Pool is closed/) - }) - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('Pool is closed') + }) }) it('should fail to acquire when closed with idle connections', async () => { @@ -307,11 +343,9 @@ describe('#unit Pool', async () => { // Close the pool await pool.close() - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.objectContaining({ - message: jasmine.stringMatching(/Pool is closed/) - }) - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('Pool is closed') + }) }) it('purges keys other than the ones to keep', async () => { let counter = 0 @@ -561,9 +595,9 @@ describe('#unit Pool', async () => { await pool.acquire(address) await pool.acquire(address) - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.stringMatching('acquisition timed out') - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('acquisition timed out') + }) expectNumberOfAcquisitionRequests(pool, address, 0) }) @@ -607,11 +641,11 @@ describe('#unit Pool', async () => { // Let's fulfill the connect promise belonging to the first request. conns[0].resolve(conns[0]) - await expectAsync(req1).toBeResolved() + await expect(req1).resolves.toBeDefined() // Release the connection, it should be picked up by the second request. conns[0].release(address, conns[0]) - await expectAsync(req2).toBeResolved() + await expect(req2).resolves.toBeDefined() // Just to make sure that there hasn't been any new connection. expect(conns.length).toEqual(1)