From 2fb0e0e0f1507bdc0372000ed29d3d153837ab05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Fri, 17 Dec 2021 12:46:22 -0300 Subject: [PATCH] Fix connections not being destroyed when released to purged pool (#823) This scenario happens when the pull for a given address is purged while a connection stills in use and then another pool for the same address is created. In this case, the connection was being wrongly added to the existing pool. The correct behaviour is destroy this orphan connections. --- bolt-connection/src/pool/pool.js | 30 ++++++-- .../test/pool}/pool.test.js | 70 ++++++++++++++----- 2 files changed, 78 insertions(+), 22 deletions(-) rename {test/internal => bolt-connection/test/pool}/pool.test.js (93%) diff --git a/bolt-connection/src/pool/pool.js b/bolt-connection/src/pool/pool.js index dff3cd47a..3d4ce8a02 100644 --- a/bolt-connection/src/pool/pool.js +++ b/bolt-connection/src/pool/pool.js @@ -62,6 +62,7 @@ class Pool { this._pendingCreates = {} this._acquireRequests = {} this._activeResourceCounts = {} + this._poolState = {} this._release = this._release.bind(this) this._log = log this._closed = false @@ -189,10 +190,13 @@ class Pool { const key = address.asKey() let pool = this._pools[key] + let poolState = this._poolState[key] if (!pool) { pool = [] + poolState = new PoolState() this._pools[key] = pool this._pendingCreates[key] = 0 + this._poolState[key] = poolState } while (pool.length) { const resource = pool.pop() @@ -231,7 +235,7 @@ class Pool { let resource try { // Invoke callback that creates actual connection - resource = await this._create(address, this._release) + resource = await this._create(address, (address, resource) => this._release(poolState, address, resource)) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { @@ -243,11 +247,11 @@ class Pool { return resource } - async _release (address, resource) { + async _release (poolState, address, resource) { const key = address.asKey() const pool = this._pools[key] - if (pool) { + if (pool && poolState.isActive()) { // there exist idle connections for the given key if (!this._validate(resource)) { if (this._log.isDebugEnabled()) { @@ -295,6 +299,7 @@ class Pool { async _purgeKey (key) { const pool = this._pools[key] || [] + const poolState = this._poolState[key] || new PoolState() while (pool.length) { const resource = pool.pop() if (this._removeIdleObserver) { @@ -302,12 +307,15 @@ class Pool { } await this._destroy(resource) } + poolState.close() delete this._pools[key] + delete this._poolState[key] } _processPendingAcquireRequests (address) { const key = address.asKey() const requests = this._acquireRequests[key] + const poolState = this._poolState[key] if (requests) { const pendingRequest = requests.shift() // pop a pending acquire request @@ -326,7 +334,7 @@ class Pool { if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool - this._release(address, resource) + this._release(poolState, address, resource) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -404,4 +412,18 @@ class PendingRequest { } } +class PoolState { + constructor() { + this._active = true; + } + + isActive() { + return this._active; + } + + close() { + this._active = false; + } +} + export default Pool diff --git a/test/internal/pool.test.js b/bolt-connection/test/pool/pool.test.js similarity index 93% rename from test/internal/pool.test.js rename to bolt-connection/test/pool/pool.test.js index 1b0c24015..d717ccbc4 100644 --- a/test/internal/pool.test.js +++ b/bolt-connection/test/pool/pool.test.js @@ -17,8 +17,8 @@ * limitations under the License. */ -import Pool from '../../bolt-connection/lib/pool/pool' -import PoolConfig from '../../bolt-connection/lib/pool/pool-config' +import Pool from '../../src/pool/pool' +import PoolConfig from '../../src/pool/pool-config' import { newError, error, internal } from 'neo4j-driver-core' const { @@ -27,7 +27,7 @@ const { const { SERVICE_UNAVAILABLE } = error -describe('#unit Pool', async () => { +describe('#unit Pool', () => { it('allocates if pool is empty', async () => { // Given let counter = 0 @@ -237,6 +237,44 @@ describe('#unit Pool', async () => { expect(r0.destroyed).toBeTruthy() }) + it('destroys resource when pool is purged even if a new pool is created for the same address', async () => { + let counter = 0 + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + res.destroyed = true + return Promise.resolve() + } + }) + + // Acquire resource + const r0 = await pool.acquire(address) + expect(pool.has(address)).toBeTruthy() + expect(r0.id).toEqual(0) + + // Purging the key + await pool.purge(address) + expect(pool.has(address)).toBeFalsy() + expect(r0.destroyed).toBeFalsy() + + // Acquiring second resource should recreate the pool + const r1 = await pool.acquire(address) + expect(pool.has(address)).toBeTruthy() + expect(r1.id).toEqual(1) + + // Closing the first resource should destroy it + await r0.close() + expect(pool.has(address)).toBeTruthy() + expect(r0.destroyed).toBeTruthy() + + // Closing the second resource should not destroy it + await r1.close() + expect(pool.has(address)).toBeTruthy() + expect(r1.destroyed).toBeFalsy() + }) + it('close purges all keys', async () => { let counter = 0 @@ -282,11 +320,9 @@ describe('#unit Pool', async () => { // Close the pool await pool.close() - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.objectContaining({ - message: jasmine.stringMatching(/Pool is closed/) - }) - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('Pool is closed') + }) }) it('should fail to acquire when closed with idle connections', async () => { @@ -307,11 +343,9 @@ describe('#unit Pool', async () => { // Close the pool await pool.close() - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.objectContaining({ - message: jasmine.stringMatching(/Pool is closed/) - }) - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('Pool is closed') + }) }) it('purges keys other than the ones to keep', async () => { let counter = 0 @@ -561,9 +595,9 @@ describe('#unit Pool', async () => { await pool.acquire(address) await pool.acquire(address) - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.stringMatching('acquisition timed out') - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('acquisition timed out') + }) expectNumberOfAcquisitionRequests(pool, address, 0) }) @@ -607,11 +641,11 @@ describe('#unit Pool', async () => { // Let's fulfill the connect promise belonging to the first request. conns[0].resolve(conns[0]) - await expectAsync(req1).toBeResolved() + await expect(req1).resolves.toBeDefined() // Release the connection, it should be picked up by the second request. conns[0].release(address, conns[0]) - await expectAsync(req2).toBeResolved() + await expect(req2).resolves.toBeDefined() // Just to make sure that there hasn't been any new connection. expect(conns.length).toEqual(1)