Skip to content

Fix connections not being destroyed when released to purged pool #823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions packages/bolt-connection/src/pool/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Pool {
this._pendingCreates = {}
this._acquireRequests = {}
this._activeResourceCounts = {}
this._poolState = {}
this._release = this._release.bind(this)
this._log = log
this._closed = false
Expand Down Expand Up @@ -189,10 +190,13 @@ class Pool {

const key = address.asKey()
let pool = this._pools[key]
let poolState = this._poolState[key]
if (!pool) {
pool = []
poolState = new PoolState()
this._pools[key] = pool
this._pendingCreates[key] = 0
this._poolState[key] = poolState
}
while (pool.length) {
const resource = pool.pop()
Expand Down Expand Up @@ -231,7 +235,7 @@ class Pool {
let resource
try {
// Invoke callback that creates actual connection
resource = await this._create(address, this._release)
resource = await this._create(address, (address, resource) => this._release(poolState, address, resource))

resourceAcquired(key, this._activeResourceCounts)
if (this._log.isDebugEnabled()) {
Expand All @@ -243,11 +247,11 @@ class Pool {
return resource
}

async _release (address, resource) {
async _release (poolState, address, resource) {
const key = address.asKey()
const pool = this._pools[key]

if (pool) {
if (pool && poolState.isActive()) {
// there exist idle connections for the given key
if (!this._validate(resource)) {
if (this._log.isDebugEnabled()) {
Expand Down Expand Up @@ -295,19 +299,23 @@ class Pool {

async _purgeKey (key) {
const pool = this._pools[key] || []
const poolState = this._poolState[key] || new PoolState()
while (pool.length) {
const resource = pool.pop()
if (this._removeIdleObserver) {
this._removeIdleObserver(resource)
}
await this._destroy(resource)
}
poolState.close()
delete this._pools[key]
delete this._poolState[key]
}

_processPendingAcquireRequests (address) {
const key = address.asKey()
const requests = this._acquireRequests[key]
const poolState = this._poolState[key]
if (requests) {
const pendingRequest = requests.shift() // pop a pending acquire request

Expand All @@ -326,7 +334,7 @@ class Pool {
if (pendingRequest.isCompleted()) {
// request has been completed, most likely failed by a timeout
// return the acquired resource back to the pool
this._release(address, resource)
this._release(poolState, address, resource)
} else {
// request is still pending and can be resolved with the newly acquired resource
pendingRequest.resolve(resource) // resolve the pending request with the acquired resource
Expand Down Expand Up @@ -404,4 +412,18 @@ class PendingRequest {
}
}

class PoolState {
constructor() {
this._active = true;
}

isActive() {
return this._active;
}

close() {
this._active = false;
}
}

export default Pool
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* limitations under the License.
*/

import Pool from '../../../bolt-connection/lib/pool/pool'
import PoolConfig from '../../../bolt-connection/lib/pool/pool-config'
import Pool from '../../src/pool/pool'
import PoolConfig from '../../src/pool/pool-config'
import { newError, error, internal } from 'neo4j-driver-core'

const {
Expand All @@ -27,7 +27,7 @@ const {

const { SERVICE_UNAVAILABLE } = error

describe('#unit Pool', async () => {
describe('#unit Pool', () => {
it('allocates if pool is empty', async () => {
// Given
let counter = 0
Expand Down Expand Up @@ -237,6 +237,44 @@ describe('#unit Pool', async () => {
expect(r0.destroyed).toBeTruthy()
})

it('destroys resource when pool is purged even if a new pool is created for the same address', async () => {
let counter = 0
const address = ServerAddress.fromUrl('bolt://localhost:7687')
const pool = new Pool({
create: (server, release) =>
Promise.resolve(new Resource(server, counter++, release)),
destroy: res => {
res.destroyed = true
return Promise.resolve()
}
})

// Acquire resource
const r0 = await pool.acquire(address)
expect(pool.has(address)).toBeTruthy()
expect(r0.id).toEqual(0)

// Purging the key
await pool.purge(address)
expect(pool.has(address)).toBeFalsy()
expect(r0.destroyed).toBeFalsy()

// Acquiring second resource should recreate the pool
const r1 = await pool.acquire(address)
expect(pool.has(address)).toBeTruthy()
expect(r1.id).toEqual(1)

// Closing the first resource should destroy it
await r0.close()
expect(pool.has(address)).toBeTruthy()
expect(r0.destroyed).toBeTruthy()

// Closing the second resource should not destroy it
await r1.close()
expect(pool.has(address)).toBeTruthy()
expect(r1.destroyed).toBeFalsy()
})

it('close purges all keys', async () => {
let counter = 0

Expand Down Expand Up @@ -282,11 +320,9 @@ describe('#unit Pool', async () => {
// Close the pool
await pool.close()

await expectAsync(pool.acquire(address)).toBeRejectedWith(
jasmine.objectContaining({
message: jasmine.stringMatching(/Pool is closed/)
})
)
await expect(pool.acquire(address)).rejects.toMatchObject({
message: expect.stringMatching('Pool is closed')
})
})

it('should fail to acquire when closed with idle connections', async () => {
Expand All @@ -307,11 +343,9 @@ describe('#unit Pool', async () => {
// Close the pool
await pool.close()

await expectAsync(pool.acquire(address)).toBeRejectedWith(
jasmine.objectContaining({
message: jasmine.stringMatching(/Pool is closed/)
})
)
await expect(pool.acquire(address)).rejects.toMatchObject({
message: expect.stringMatching('Pool is closed')
})
})
it('purges keys other than the ones to keep', async () => {
let counter = 0
Expand Down Expand Up @@ -561,9 +595,9 @@ describe('#unit Pool', async () => {
await pool.acquire(address)
await pool.acquire(address)

await expectAsync(pool.acquire(address)).toBeRejectedWith(
jasmine.stringMatching('acquisition timed out')
)
await expect(pool.acquire(address)).rejects.toMatchObject({
message: expect.stringMatching('acquisition timed out')
})
expectNumberOfAcquisitionRequests(pool, address, 0)
})

Expand Down Expand Up @@ -607,11 +641,11 @@ describe('#unit Pool', async () => {

// Let's fulfill the connect promise belonging to the first request.
conns[0].resolve(conns[0])
await expectAsync(req1).toBeResolved()
await expect(req1).resolves.toBeDefined()

// Release the connection, it should be picked up by the second request.
conns[0].release(address, conns[0])
await expectAsync(req2).toBeResolved()
await expect(req2).resolves.toBeDefined()

// Just to make sure that there hasn't been any new connection.
expect(conns.length).toEqual(1)
Expand Down