diff --git a/src/v1/driver.js b/src/v1/driver.js index 25c01053d..3f18cb134 100644 --- a/src/v1/driver.js +++ b/src/v1/driver.js @@ -80,13 +80,15 @@ class Driver { this._authToken = authToken this._config = config this._log = Logger.create(config) - this._pool = new Pool( - this._createConnection.bind(this), - this._destroyConnection.bind(this), - this._validateConnection.bind(this), - PoolConfig.fromDriverConfig(config), - this._log - ) + this._pool = new Pool({ + create: this._createConnection.bind(this), + destroy: this._destroyConnection.bind(this), + validate: this._validateConnection.bind(this), + installIdleObserver: this._installIdleObserverOnConnection.bind(this), + removeIdleObserver: this._removeIdleObserverOnConnection.bind(this), + config: PoolConfig.fromDriverConfig(config), + log: this._log + }) /** * Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}. @@ -177,6 +179,14 @@ class Driver { return lifetime <= maxConnectionLifetime } + _installIdleObserverOnConnection (conn, observer) { + conn._queueObserver(observer) + } + + _removeIdleObserverOnConnection (conn) { + conn._updateCurrentObserver() + } + /** * Dispose of a connection. * @return {Connection} the connection to dispose. diff --git a/src/v1/internal/pool.js b/src/v1/internal/pool.js index 53bf5edbf..3c1733756 100644 --- a/src/v1/internal/pool.js +++ b/src/v1/internal/pool.js @@ -31,19 +31,25 @@ class Pool { * @param {function} validate called at various times (like when an instance is acquired and * when it is returned). If this returns false, the resource will * be evicted + * @param {function} installIdleObserver called when the resource is released back to pool + * @param {function} removeIdleObserver called when the resource is acquired from the pool * @param {PoolConfig} config configuration for the new driver. * @param {Logger} log the driver logger. */ - constructor ( - create, - destroy = () => true, - validate = () => true, + constructor ({ + create = (address, release) => {}, + destroy = conn => true, + validate = conn => true, + installIdleObserver = (conn, observer) => {}, + removeIdleObserver = conn => {}, config = PoolConfig.defaultConfig(), log = Logger.noOp() - ) { + } = {}) { this._create = create this._destroy = destroy this._validate = validate + this._installIdleObserver = installIdleObserver + this._removeIdleObserver = removeIdleObserver this._maxSize = config.maxSize this._acquisitionTimeout = config.acquisitionTimeout this._pools = {} @@ -165,6 +171,10 @@ class Pool { const resource = pool.pop() if (this._validate(resource)) { + if (this._removeIdleObserver) { + this._removeIdleObserver(resource) + } + // idle resource is valid and can be acquired return Promise.resolve(resource) } else { @@ -197,6 +207,14 @@ class Pool { if (this._log.isDebugEnabled()) { this._log.debug(`${resource} released to the pool ${key}`) } + if (this._installIdleObserver) { + this._installIdleObserver(resource, { + onError: () => { + this._pools[key] = this._pools[key].filter(r => r !== resource) + this._destroy(resource) + } + }) + } pool.push(resource) } } else { diff --git a/test/internal/connection-providers.test.js b/test/internal/connection-providers.test.js index aff407aa0..ac28c9571 100644 --- a/test/internal/connection-providers.test.js +++ b/test/internal/connection-providers.test.js @@ -1210,9 +1210,10 @@ function setupLoadBalancerToRememberRouters (loadBalancer, routersArray) { } function newPool () { - return new Pool((address, release) => - Promise.resolve(new FakeConnection(address, release)) - ) + return new Pool({ + create: (address, release) => + Promise.resolve(new FakeConnection(address, release)) + }) } function expectRoutingTable (loadBalancer, routers, readers, writers) { diff --git a/test/internal/least-connected-load-balancing-strategy.test.js b/test/internal/least-connected-load-balancing-strategy.test.js index d58b71bb8..dba4f8ba2 100644 --- a/test/internal/least-connected-load-balancing-strategy.test.js +++ b/test/internal/least-connected-load-balancing-strategy.test.js @@ -140,7 +140,7 @@ describe('LeastConnectedLoadBalancingStrategy', () => { class DummyPool extends Pool { constructor (activeConnections) { - super(() => 42) + super({ create: () => 42 }) this._activeConnections = activeConnections } diff --git a/test/internal/node/direct.driver.boltkit.test.js b/test/internal/node/direct.driver.boltkit.test.js index 8afb2a85d..c4401e53d 100644 --- a/test/internal/node/direct.driver.boltkit.test.js +++ b/test/internal/node/direct.driver.boltkit.test.js @@ -20,7 +20,7 @@ import neo4j from '../../../src/v1' import { READ, WRITE } from '../../../src/v1/driver' import boltStub from '../bolt-stub' -import { SERVICE_UNAVAILABLE } from '../../../src/v1/error' +import { newError, SERVICE_UNAVAILABLE } from '../../../src/v1/error' describe('direct driver with stub server', () => { let originalTimeout @@ -423,6 +423,58 @@ describe('direct driver with stub server', () => { }) }) + it('should close connection if it dies sitting idle in connection pool', done => { + if (!boltStub.supported) { + done() + return + } + + const server = boltStub.start( + './test/resources/boltstub/read_server_v3_read.script', + 9001 + ) + + boltStub.run(() => { + const driver = boltStub.newDriver('bolt://127.0.0.1:9001') + const session = driver.session(READ) + + session + .run('MATCH (n) RETURN n.name') + .then(result => { + const records = result.records + expect(records.length).toEqual(3) + expect(records[0].get(0)).toBe('Bob') + expect(records[1].get(0)).toBe('Alice') + expect(records[2].get(0)).toBe('Tina') + + const connectionKey = Object.keys(driver._openConnections)[0] + expect(connectionKey).toBeTruthy() + + const connection = driver._openConnections[connectionKey] + session.close(() => { + // generate a fake fatal error + connection._handleFatalError( + newError('connection reset', SERVICE_UNAVAILABLE) + ) + + // expect that the connection to be removed from the pool + expect(driver._pool._pools['127.0.0.1:9001'].length).toEqual(0) + expect( + driver._pool._activeResourceCounts['127.0.0.1:9001'] + ).toBeFalsy() + // expect that the connection to be unregistered from the open connections registry + expect(driver._openConnections[connectionKey]).toBeFalsy() + driver.close() + server.exit(code => { + expect(code).toEqual(0) + done() + }) + }) + }) + .catch(error => done.fail(error)) + }) + }) + describe('should fail if commit fails due to broken connection', () => { it('v1', done => { verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted( diff --git a/test/internal/pool.test.js b/test/internal/pool.test.js index 96d59c1c8..ffd947442 100644 --- a/test/internal/pool.test.js +++ b/test/internal/pool.test.js @@ -20,15 +20,17 @@ import Pool from '../../src/v1/internal/pool' import PoolConfig from '../../src/v1/internal/pool-config' import ServerAddress from '../../src/v1/internal/server-address' +import { newError, SERVICE_UNAVAILABLE } from '../../src/v1/error' describe('Pool', () => { it('allocates if pool is empty', done => { // Given let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool((server, release) => - Promise.resolve(new Resource(server, counter++, release)) - ) + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, counter++, release)) + }) // When const p0 = pool.acquire(address) @@ -51,9 +53,10 @@ describe('Pool', () => { // Given a pool that allocates let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool((server, release) => - Promise.resolve(new Resource(server, counter++, release)) - ) + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, counter++, release)) + }) // When const p0 = pool.acquire(address).then(r0 => { @@ -80,9 +83,10 @@ describe('Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool((server, release) => - Promise.resolve(new Resource(server, counter++, release)) - ) + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, counter++, release)) + }) // When const p0 = pool.acquire(address1) @@ -115,15 +119,15 @@ describe('Pool', () => { let counter = 0 let destroyed = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - resource => { + destroy: resource => { destroyed.push(resource) }, - resource => false, - new PoolConfig(1000, 60000) - ) + validate: resource => false, + config: new PoolConfig(1000, 60000) + }) // When const p0 = pool.acquire(address) @@ -150,14 +154,14 @@ describe('Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - res => { + destroy: res => { res.destroyed = true return true } - ) + }) // When const p0 = pool.acquire(address1) @@ -199,14 +203,14 @@ describe('Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - res => { + destroy: res => { res.destroyed = true return true } - ) + }) // When const p00 = pool.acquire(address1) @@ -247,14 +251,14 @@ describe('Pool', () => { it('destroys resource when key was purged', done => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - res => { + destroy: res => { res.destroyed = true return true } - ) + }) const p0 = pool.acquire(address) p0.then(r0 => { @@ -280,14 +284,14 @@ describe('Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - res => { + destroy: res => { res.destroyed = true return true } - ) + }) const acquiredResources = [ pool.acquire(address1), @@ -316,14 +320,14 @@ describe('Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - res => { + destroy: res => { res.destroyed = true return true } - ) + }) const acquiredResources = [ pool.acquire(address1), @@ -356,14 +360,14 @@ describe('Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - res => { + destroy: res => { res.destroyed = true return true } - ) + }) const acquiredResources = [ pool.acquire(address1), @@ -393,21 +397,21 @@ describe('Pool', () => { let validated = false let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - res => { + destroy: res => { res.destroyed = true return true }, - () => { + validate: () => { if (validated) { return false } validated = true return true } - ) + }) const p0 = pool.acquire(address) const p1 = p0.then(r0 => { @@ -430,9 +434,10 @@ describe('Pool', () => { const existingAddress = ServerAddress.fromUrl('bolt://localhost:7687') const absentAddress = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool((server, release) => - Promise.resolve(new Resource(server, 42, release)) - ) + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, 42, release)) + }) const p0 = pool.acquire(existingAddress) const p1 = pool.acquire(existingAddress) @@ -446,9 +451,10 @@ describe('Pool', () => { }) it('reports zero active resources when empty', () => { - const pool = new Pool((server, release) => - Promise.resolve(new Resource(server, 42, release)) - ) + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, 42, release)) + }) expect( pool.activeResourceCount(ServerAddress.fromUrl('bolt://localhost:1')) @@ -463,9 +469,10 @@ describe('Pool', () => { it('reports active resources', done => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool((server, release) => - Promise.resolve(new Resource(server, 42, release)) - ) + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, 42, release)) + }) const p0 = pool.acquire(address) const p1 = pool.acquire(address) @@ -482,9 +489,10 @@ describe('Pool', () => { it('reports active resources when they are acquired', done => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool((server, release) => - Promise.resolve(new Resource(server, 42, release)) - ) + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, 42, release)) + }) // three new resources are created and returned to the pool const p0 = pool.acquire(address) @@ -517,9 +525,10 @@ describe('Pool', () => { it('does not report resources that are returned to the pool', done => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool((server, release) => - Promise.resolve(new Resource(server, 42, release)) - ) + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, 42, release)) + }) const p0 = pool.acquire(address) const p1 = pool.acquire(address) @@ -559,13 +568,13 @@ describe('Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - resource => {}, - resource => true, - new PoolConfig(2, 5000) - ) + destroy: resource => {}, + validate: resource => true, + config: new PoolConfig(2, 5000) + }) const p0 = pool.acquire(address) const p1 = pool.acquire(address) @@ -593,13 +602,13 @@ describe('Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - resource => {}, - resource => true, - new PoolConfig(2, 1000) - ) + destroy: resource => {}, + validate: resource => true, + config: new PoolConfig(2, 1000) + }) const p0 = pool.acquire(address) const p1 = pool.acquire(address) @@ -622,12 +631,12 @@ describe('Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - resource => {}, - resource => true - ) + destroy: resource => {}, + validate: resource => true + }) const p0 = pool.acquire(address) const p1 = pool.acquire(address) @@ -651,13 +660,13 @@ describe('Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - resource => {}, - () => true, - new PoolConfig(2, acquisitionTimeout) - ) + destroy: resource => {}, + validate: () => true, + config: new PoolConfig(2, acquisitionTimeout) + }) pool.acquire(address).then(resource1 => { expect(resource1.id).toEqual(0) @@ -696,13 +705,13 @@ describe('Pool', () => { const acquisitionTimeout = 1000 let counter = 0 - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - resource => {}, - resourceValidOnlyOnceValidationFunction, - new PoolConfig(1, acquisitionTimeout) - ) + destroy: resource => {}, + validate: resourceValidOnlyOnceValidationFunction, + config: new PoolConfig(1, acquisitionTimeout) + }) pool.acquire(address).then(resource1 => { expect(resource1.id).toEqual(0) @@ -733,13 +742,13 @@ describe('Pool', () => { const acquisitionTimeout = 1000 let counter = 0 - const pool = new Pool( - (server, release) => + const pool = new Pool({ + create: (server, release) => Promise.resolve(new Resource(server, counter++, release)), - resource => {}, - resourceValidOnlyOnceValidationFunction, - new PoolConfig(2, acquisitionTimeout) - ) + destroy: resource => {}, + validate: resourceValidOnlyOnceValidationFunction, + config: new PoolConfig(2, acquisitionTimeout) + }) pool.acquire(address).then(resource1 => { expect(resource1.id).toEqual(0) @@ -770,6 +779,93 @@ describe('Pool', () => { }) }) }) + + it('should set-up idle observer on acquire and release', done => { + const address = ServerAddress.fromUrl('bolt://localhost:7687') + let resourceCount = 0 + let installIdleObserverCount = 0 + let removeIdleObserverCount = 0 + + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: resource => {}, + validate: resource => true, + installIdleObserver: (resource, observer) => { + installIdleObserverCount++ + }, + removeIdleObserver: resource => { + removeIdleObserverCount++ + } + }) + + const p01 = pool.acquire(address) + const p02 = pool.acquire(address) + const p03 = pool.acquire(address) + Promise.all([p01, p02, p03]).then(resources => { + resources[0].close() + resources[1].close() + resources[2].close() + + expect(installIdleObserverCount).toEqual(3) + expect(removeIdleObserverCount).toEqual(0) + + const p11 = pool.acquire(address) + const p12 = pool.acquire(address) + const p13 = pool.acquire(address) + + Promise.all([p11, p12, p13]).then(resources => { + expect(installIdleObserverCount).toEqual(3) + expect(removeIdleObserverCount).toEqual(3) + + done() + }) + }) + }) + + it('should clean-up resource when connection fails while idle', done => { + const address = ServerAddress.fromUrl('bolt://localhost:7687') + let resourceCount = 0 + + const pool = new Pool({ + create: (server, release) => + Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: resource => {}, + validate: resource => true, + installIdleObserver: (resource, observer) => { + resource['observer'] = observer + }, + removeIdleObserver: resource => { + delete resource['observer'] + } + }) + + pool.acquire(address).then(resource1 => { + pool.acquire(address).then(resource2 => { + expect(pool.activeResourceCount(address)).toBe(2) + + resource1.close() + expect(pool.activeResourceCount(address)).toBe(1) + + resource2.close() + expect(pool.activeResourceCount(address)).toBe(0) + + expect(pool.has(address)).toBeTruthy() + + resource1['observer'].onError( + newError('connection reset', SERVICE_UNAVAILABLE) + ) + resource2['observer'].onError( + newError('connection reset', SERVICE_UNAVAILABLE) + ) + + expect(pool.activeResourceCount(address)).toBe(0) + expectNoIdleResources(pool, address) + + done() + }) + }) + }) }) function expectNoPendingAcquisitionRequests (pool) { @@ -782,6 +878,12 @@ function expectNoPendingAcquisitionRequests (pool) { }) } +function expectNoIdleResources (pool, address) { + if (pool.has(address)) { + expect(pool._pools[address.asKey()].length).toBe(0) + } +} + function expectNumberOfAcquisitionRequests (pool, address, expectedNumber) { expect(pool._acquireRequests[address.asKey()].length).toEqual(expectedNumber) }