From 096055937c63f901068f88e2f521a24c016a8309 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 17 Dec 2021 14:12:14 +0100 Subject: [PATCH 1/4] Moving pool tests to --- .../test/pool}/pool.test.js | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) rename packages/{neo4j-driver/test/internal => bolt-connection/test/pool}/pool.test.js (97%) diff --git a/packages/neo4j-driver/test/internal/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js similarity index 97% rename from packages/neo4j-driver/test/internal/pool.test.js rename to packages/bolt-connection/test/pool/pool.test.js index ba538474a..7e1b5c0df 100644 --- a/packages/neo4j-driver/test/internal/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -17,8 +17,8 @@ * limitations under the License. */ -import Pool from '../../../bolt-connection/lib/pool/pool' -import PoolConfig from '../../../bolt-connection/lib/pool/pool-config' +import Pool from '../../src/pool/pool' +import PoolConfig from '../../src/pool/pool-config' import { newError, error, internal } from 'neo4j-driver-core' const { @@ -27,7 +27,7 @@ const { const { SERVICE_UNAVAILABLE } = error -describe('#unit Pool', async () => { +describe('#unit Pool', () => { it('allocates if pool is empty', async () => { // Given let counter = 0 @@ -282,11 +282,9 @@ describe('#unit Pool', async () => { // Close the pool await pool.close() - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.objectContaining({ - message: jasmine.stringMatching(/Pool is closed/) - }) - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('Pool is closed') + }) }) it('should fail to acquire when closed with idle connections', async () => { @@ -307,11 +305,9 @@ describe('#unit Pool', async () => { // Close the pool await pool.close() - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.objectContaining({ - message: jasmine.stringMatching(/Pool is closed/) - }) - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('Pool is closed') + }) }) it('purges keys other than the ones to keep', async () => { let counter = 0 @@ -561,9 +557,9 @@ describe('#unit Pool', async () => { await pool.acquire(address) await pool.acquire(address) - await expectAsync(pool.acquire(address)).toBeRejectedWith( - jasmine.stringMatching('acquisition timed out') - ) + await expect(pool.acquire(address)).rejects.toMatchObject({ + message: expect.stringMatching('acquisition timed out') + }) expectNumberOfAcquisitionRequests(pool, address, 0) }) @@ -607,11 +603,11 @@ describe('#unit Pool', async () => { // Let's fulfill the connect promise belonging to the first request. conns[0].resolve(conns[0]) - await expectAsync(req1).toBeResolved() + await expect(req1).resolves.toBeDefined() // Release the connection, it should be picked up by the second request. conns[0].release(address, conns[0]) - await expectAsync(req2).toBeResolved() + await expect(req2).resolves.toBeDefined() // Just to make sure that there hasn't been any new connection. expect(conns.length).toEqual(1) From db15b1e61f25b2df25687234b7a67080c74db0fa Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 17 Dec 2021 14:44:09 +0100 Subject: [PATCH 2/4] Fix connections not being destroyed when released to purged pool This scenario happens when the pull for a given address is purged while a connection stills in use and then another pool for the same address is created. In this case, the connection was being wrongly added to the existing pool. The correct behaviour is destroy this orphan connections. --- packages/bolt-connection/src/pool/pool.js | 30 +++++++++++++-- .../bolt-connection/test/pool/pool.test.js | 38 +++++++++++++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index dff3cd47a..3d4ce8a02 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -62,6 +62,7 @@ class Pool { this._pendingCreates = {} this._acquireRequests = {} this._activeResourceCounts = {} + this._poolState = {} this._release = this._release.bind(this) this._log = log this._closed = false @@ -189,10 +190,13 @@ class Pool { const key = address.asKey() let pool = this._pools[key] + let poolState = this._poolState[key] if (!pool) { pool = [] + poolState = new PoolState() this._pools[key] = pool this._pendingCreates[key] = 0 + this._poolState[key] = poolState } while (pool.length) { const resource = pool.pop() @@ -231,7 +235,7 @@ class Pool { let resource try { // Invoke callback that creates actual connection - resource = await this._create(address, this._release) + resource = await this._create(address, (address, resource) => this._release(poolState, address, resource)) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { @@ -243,11 +247,11 @@ class Pool { return resource } - async _release (address, resource) { + async _release (poolState, address, resource) { const key = address.asKey() const pool = this._pools[key] - if (pool) { + if (pool && poolState.isActive()) { // there exist idle connections for the given key if (!this._validate(resource)) { if (this._log.isDebugEnabled()) { @@ -295,6 +299,7 @@ class Pool { async _purgeKey (key) { const pool = this._pools[key] || [] + const poolState = this._poolState[key] || new PoolState() while (pool.length) { const resource = pool.pop() if (this._removeIdleObserver) { @@ -302,12 +307,15 @@ class Pool { } await this._destroy(resource) } + poolState.close() delete this._pools[key] + delete this._poolState[key] } _processPendingAcquireRequests (address) { const key = address.asKey() const requests = this._acquireRequests[key] + const poolState = this._poolState[key] if (requests) { const pendingRequest = requests.shift() // pop a pending acquire request @@ -326,7 +334,7 @@ class Pool { if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool - this._release(address, resource) + this._release(poolState, address, resource) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -404,4 +412,18 @@ class PendingRequest { } } +class PoolState { + constructor() { + this._active = true; + } + + isActive() { + return this._active; + } + + close() { + this._active = false; + } +} + export default Pool diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js index 7e1b5c0df..bab8df747 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -237,6 +237,44 @@ describe('#unit Pool', () => { expect(r0.destroyed).toBeTruthy() }) + it('destroys resource when key was purged event when key is acquired again', async () => { + let counter = 0 + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, counter++, release)), + destroy: res => { + res.destroyed = true + return Promise.resolve() + } + }) + + // Acquire resource + const r0 = await pool.acquire(address) + expect(pool.has(address)).toBeTruthy() + expect(r0.id).toEqual(0) + + // Purging the key + await pool.purge(address) + expect(pool.has(address)).toBeFalsy() + expect(r0.destroyed).toBeFalsy() + + // Acquiring second resolve should recreate the pool + const r1 = await pool.acquire(address) + expect(pool.has(address)).toBeTruthy() + expect(r1.id).toEqual(1) + + // Closing the first resource should destroy it + await r0.close() + expect(pool.has(address)).toBeTruthy() + expect(r0.destroyed).toBeTruthy() + + // Closing the second resource should not destroy it + await r1.close() + expect(pool.has(address)).toBeTruthy() + expect(r1.destroyed).toBeFalsy() + }) + it('close purges all keys', async () => { let counter = 0 From ca242455890841271cb5261a5aeb9dae1424191c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Fri, 17 Dec 2021 11:13:40 -0300 Subject: [PATCH 3/4] Update packages/bolt-connection/test/pool/pool.test.js Co-authored-by: Robsdedude --- packages/bolt-connection/test/pool/pool.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js index bab8df747..de4f3c34e 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -259,7 +259,7 @@ describe('#unit Pool', () => { expect(pool.has(address)).toBeFalsy() expect(r0.destroyed).toBeFalsy() - // Acquiring second resolve should recreate the pool + // Acquiring second resource should recreate the pool const r1 = await pool.acquire(address) expect(pool.has(address)).toBeTruthy() expect(r1.id).toEqual(1) From d1a18a1571190d1754be20e1f961c6ad0ae98a3e Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 17 Dec 2021 15:16:35 +0100 Subject: [PATCH 4/4] Rephasing test --- packages/bolt-connection/test/pool/pool.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js index de4f3c34e..d717ccbc4 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -237,7 +237,7 @@ describe('#unit Pool', () => { expect(r0.destroyed).toBeTruthy() }) - it('destroys resource when key was purged event when key is acquired again', async () => { + it('destroys resource when pool is purged even if a new pool is created for the same address', async () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({