diff --git a/packages/bolt-connection/src/channel/channel-config.js b/packages/bolt-connection/src/channel/channel-config.js index 2c6417fc0..85df9b22c 100644 --- a/packages/bolt-connection/src/channel/channel-config.js +++ b/packages/bolt-connection/src/channel/channel-config.js @@ -25,8 +25,6 @@ const { const { SERVICE_UNAVAILABLE } = error -const DEFAULT_CONNECTION_TIMEOUT_MILLIS = 30000 // 30 seconds by default - const ALLOWED_VALUES_ENCRYPTED = [ null, undefined, @@ -58,7 +56,7 @@ export default class ChannelConfig { this.trustedCertificates = extractTrustedCertificates(driverConfig) this.knownHostsPath = extractKnownHostsPath(driverConfig) this.connectionErrorCode = connectionErrorCode || SERVICE_UNAVAILABLE - this.connectionTimeout = extractConnectionTimeout(driverConfig) + this.connectionTimeout = driverConfig.connectionTimeout } } @@ -90,19 +88,3 @@ function extractKnownHostsPath (driverConfig) { return driverConfig.knownHosts || null } -function extractConnectionTimeout (driverConfig) { - const configuredTimeout = parseInt(driverConfig.connectionTimeout, 10) - if (configuredTimeout === 0) { - // timeout explicitly configured to 0 - return null - } else if (configuredTimeout && configuredTimeout < 0) { - // timeout explicitly configured to a negative value - return null - } else if (!configuredTimeout) { - // timeout not configured, use default value - return DEFAULT_CONNECTION_TIMEOUT_MILLIS - } else { - // timeout configured, use the provided value - return configuredTimeout - } -} diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/bolt-connection/src/pool/pool.js index 3d4ce8a02..14b38400a 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/bolt-connection/src/pool/pool.js @@ -74,51 +74,45 @@ class Pool { * @return {Object} resource that is ready to use. */ acquire (address) { - return this._acquire(address).then(resource => { - const key = address.asKey() + const key = address.asKey() - if (resource) { - // New or existing resource acquired - return resource - } + // We're out of resources and will try to acquire later on when an existing resource is released. + const allRequests = this._acquireRequests + const requests = allRequests[key] + if (!requests) { + allRequests[key] = [] + } + return new Promise((resolve, reject) => { + let request - // We're out of resources and will try to acquire later on when an existing resource is released. - const allRequests = this._acquireRequests - const requests = allRequests[key] - if (!requests) { - allRequests[key] = [] - } + const timeoutId = setTimeout(() => { + // acquisition timeout fired - return new Promise((resolve, reject) => { - let request - - const timeoutId = setTimeout(() => { - // acquisition timeout fired - - // remove request from the queue of pending requests, if it's still there - // request might've been taken out by the release operation - const pendingRequests = allRequests[key] - if (pendingRequests) { - allRequests[key] = pendingRequests.filter(item => item !== request) - } - - if (request.isCompleted()) { - // request already resolved/rejected by the release operation; nothing to do - } else { - // request is still pending and needs to be failed - const activeCount = this.activeResourceCount(address) - const idleCount = this.has(address) ? this._pools[key].length : 0 - request.reject( - newError( - `Connection acquisition timed out in ${this._acquisitionTimeout} ms. Pool status: Active conn count = ${activeCount}, Idle conn count = ${idleCount}.` - ) + // remove request from the queue of pending requests, if it's still there + // request might've been taken out by the release operation + const pendingRequests = allRequests[key] + if (pendingRequests) { + allRequests[key] = pendingRequests.filter(item => item !== request) + } + + if (request.isCompleted()) { + // request already resolved/rejected by the release operation; nothing to do + } else { + // request is still pending and needs to be failed + const activeCount = this.activeResourceCount(address) + const idleCount = this.has(address) ? this._pools[key].length : 0 + request.reject( + newError( + `Connection acquisition timed out in ${this._acquisitionTimeout} ms. Pool status: Active conn count = ${activeCount}, Idle conn count = ${idleCount}.` ) - } - }, this._acquisitionTimeout) + ) + } + }, this._acquisitionTimeout) - request = new PendingRequest(key, resolve, reject, timeoutId, this._log) - allRequests[key].push(request) - }) + request = new PendingRequest(key, resolve, reject, timeoutId, this._log) + allRequests[key].push(request) + + this._processPendingAcquireRequests(address) }) } @@ -315,7 +309,7 @@ class Pool { _processPendingAcquireRequests (address) { const key = address.asKey() const requests = this._acquireRequests[key] - const poolState = this._poolState[key] + const poolState = this._poolState[key] || new PoolState() if (requests) { const pendingRequest = requests.shift() // pop a pending acquire request @@ -339,6 +333,15 @@ class Pool { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource } + } else { + // failed to acquire a valid resource from the pool + // return the pending request back to the pool + if (!pendingRequest.isCompleted()) { + if (!this._acquireRequests[key]) { + this._acquireRequests[key] = [] + } + this._acquireRequests[key].unshift(pendingRequest) + } } }) } else { diff --git a/packages/bolt-connection/test/channel/node/node-channel.test.js b/packages/bolt-connection/test/channel/node/node-channel.test.js index 5d6b175b1..973965af9 100644 --- a/packages/bolt-connection/test/channel/node/node-channel.test.js +++ b/packages/bolt-connection/test/channel/node/node-channel.test.js @@ -110,7 +110,7 @@ describe('NodeChannel', () => { }) }) -function createMockedChannel (connected, config = {}) { +function createMockedChannel (connected, config = { connectionTimeout: 30000 }) { let endCallback = null const address = ServerAddress.fromUrl('bolt://localhost:9999') const channelConfig = new ChannelConfig(address, config, SERVICE_UNAVAILABLE) diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/bolt-connection/test/pool/pool.test.js index d717ccbc4..90b627ee7 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/bolt-connection/test/pool/pool.test.js @@ -877,6 +877,40 @@ describe('#unit Pool', () => { expect(resource1.observer).toBeFalsy() expect(resource2.observer).toBeFalsy() }) + + it('should thrown aquisition timeout exception if resource takes longer to be created', async () => { + const address = ServerAddress.fromUrl('bolt://localhost:7687') + const acquisitionTimeout = 1000 + let counter = 0 + + const pool = new Pool({ + create: (server, release) => + new Promise(resolve => setTimeout( + () => resolve(new Resource(server, counter++, release)) + , acquisitionTimeout + 10)), + destroy: res => Promise.resolve(), + validate: resourceValidOnlyOnceValidationFunction, + config: new PoolConfig(1, acquisitionTimeout) + }) + + try { + await pool.acquire(address) + fail('should have thrown') + } catch (e) { + expect(e).toEqual( + newError( + `Connection acquisition timed out in ${acquisitionTimeout} ms. ` + + 'Pool status: Active conn count = 0, Idle conn count = 0.' + ) + ) + + const numberOfIdleResourceAfterResourceGetCreated = await new Promise(resolve => + setTimeout(() => resolve(idleResources(pool, address)), 11)) + + expect(numberOfIdleResourceAfterResourceGetCreated).toEqual(1) + expect(counter).toEqual(1) + } + }) }) function expectNoPendingAcquisitionRequests (pool) { @@ -895,6 +929,13 @@ function expectNoIdleResources (pool, address) { } } +function idleResources (pool, address) { + if (pool.has(address)) { + return pool._pools[address.asKey()].length + } + return undefined +} + function expectNumberOfAcquisitionRequests (pool, address, expectedNumber) { expect(pool._acquireRequests[address.asKey()].length).toEqual(expectedNumber) } diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index 6d37f6941..fc275630d 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -26,6 +26,7 @@ import { ACCESS_MODE_READ, ACCESS_MODE_WRITE, FETCH_ALL, + DEFAULT_CONNECTION_TIMEOUT_MILLIS, DEFAULT_POOL_ACQUISITION_TIMEOUT, DEFAULT_POOL_MAX_SIZE } from './internal/constants' @@ -131,12 +132,15 @@ class Driver { createSession: CreateSession = args => new Session(args) ) { sanitizeConfig(config) - validateConfig(config) + + const log = Logger.create(config) + + validateConfig(config, log) this._id = idGenerator++ this._meta = meta this._config = config - this._log = Logger.create(config) + this._log = log; this._createConnectionProvider = createConnectonProvider this._createSession = createSession @@ -366,13 +370,22 @@ class Driver { * @private * @returns {Object} the given config. */ -function validateConfig(config: any): any { +function validateConfig(config: any, log: Logger): any { const resolver = config.resolver if (resolver && typeof resolver !== 'function') { throw new TypeError( `Configured resolver should be a function. Got: ${resolver}` ) } + + if (config.connectionAcquisitionTimeout < config.connectionTimeout) { + log.warn( + 'Configuration for "connectionAcquisitionTimeout" should be greater than ' + + 'or equal to "connectionTimeout". Otherwise, the connection acquisition ' + + 'timeout will take precedence for over the connection timeout in scenarios ' + + 'where a new connection is created while it is acquired' + ) + } return config } @@ -396,6 +409,7 @@ function sanitizeConfig(config: any) { config.fetchSize, DEFAULT_FETCH_SIZE ) + config.connectionTimeout = extractConnectionTimeout(config) } /** @@ -431,6 +445,26 @@ function validateFetchSizeValue( } } +/** + * @private + */ +function extractConnectionTimeout (config: any): number|null { + const configuredTimeout = parseInt(config.connectionTimeout, 10) + if (configuredTimeout === 0) { + // timeout explicitly configured to 0 + return null + } else if (configuredTimeout && configuredTimeout < 0) { + // timeout explicitly configured to a negative value + return null + } else if (!configuredTimeout) { + // timeout not configured, use default value + return DEFAULT_CONNECTION_TIMEOUT_MILLIS + } else { + // timeout configured, use the provided value + return configuredTimeout + } +} + /** * @private * @returns {ConfiguredCustomResolver} new custom resolver that wraps the passed-in resolver function. diff --git a/packages/core/src/internal/constants.ts b/packages/core/src/internal/constants.ts index 3c6c1a589..922ab884f 100644 --- a/packages/core/src/internal/constants.ts +++ b/packages/core/src/internal/constants.ts @@ -20,6 +20,7 @@ const FETCH_ALL = -1 const DEFAULT_POOL_ACQUISITION_TIMEOUT = 60 * 1000 // 60 seconds const DEFAULT_POOL_MAX_SIZE = 100 +const DEFAULT_CONNECTION_TIMEOUT_MILLIS = 30000 // 30 seconds by default const ACCESS_MODE_READ: 'READ' = 'READ' const ACCESS_MODE_WRITE: 'WRITE' = 'WRITE' @@ -37,6 +38,7 @@ export { FETCH_ALL, ACCESS_MODE_READ, ACCESS_MODE_WRITE, + DEFAULT_CONNECTION_TIMEOUT_MILLIS, DEFAULT_POOL_ACQUISITION_TIMEOUT, DEFAULT_POOL_MAX_SIZE, BOLT_PROTOCOL_V1, diff --git a/packages/core/test/driver.test.ts b/packages/core/test/driver.test.ts index f34118af6..9b379f568 100644 --- a/packages/core/test/driver.test.ts +++ b/packages/core/test/driver.test.ts @@ -21,6 +21,7 @@ import Driver from '../src/driver' import { Bookmarks } from '../src/internal/bookmarks' import { Logger } from '../src/internal/logger' import { ConfiguredCustomResolver } from '../src/internal/resolver' +import { LogLevel } from '../src/types' describe('Driver', () => { let driver: Driver | null @@ -155,6 +156,44 @@ describe('Driver', () => { expect(driver.isEncrypted()).toEqual(expectedValue) }) + it.each([ + [{ connectionTimeout: 30000, connectionAcquisitionTimeout: 60000 }, true], + [{ connectionTimeout: null, connectionAcquisitionTimeout: 60000 }, true], + [{ connectionTimeout: 30000, connectionAcquisitionTimeout: null }, true], + [{ connectionTimeout: null, connectionAcquisitionTimeout: null }, true], + [{ connectionAcquisitionTimeout: 60000 }, true], + [{ connectionTimeout: 30000 }, true], + [{}, true], + [{ connectionTimeout: 30000, connectionAcquisitionTimeout: 20000 }, false], + [{ connectionAcquisitionTimeout: 20000 }, false], + [{ connectionTimeout: 70000 }, false], + // No connection timeouts should be considered valid, since it means + // the user doesn't case about the connection timeout at all. + [{ connectionTimeout: 0, connectionAcquisitionTimeout: 2000 }, true], + [{ connectionTimeout: -1, connectionAcquisitionTimeout: 2000 }, true], + ])('should emit warning if `connectionAcquisitionTimeout` and `connectionTimeout` are conflicting. [%o} ', async (config, valid) => { + const logging = { + level: 'warn' as LogLevel, + logger: jest.fn() + } + + const driver = new Driver(META_INFO, { ...config, logging }, mockCreateConnectonProvider(new ConnectionProvider()), createSession) + + if (valid) { + expect(logging.logger).not.toHaveBeenCalled() + } else { + expect(logging.logger).toHaveBeenCalledWith( + 'warn', + 'Configuration for "connectionAcquisitionTimeout" should be greater than ' + + 'or equal to "connectionTimeout". Otherwise, the connection acquisition ' + + 'timeout will take precedence for over the connection timeout in scenarios ' + + 'where a new connection is created while it is acquired' + ) + } + + await driver.close() + }) + function mockCreateConnectonProvider(connectionProvider: ConnectionProvider) { return ( id: number, @@ -172,6 +211,7 @@ describe('Driver', () => { fetchSize: 1000, maxConnectionLifetime: 3600000, maxConnectionPoolSize: 100, + connectionTimeout: 30000, }, connectionProvider, database: '', diff --git a/packages/neo4j-driver/test/driver.test.js b/packages/neo4j-driver/test/driver.test.js index 86282e9bf..c62b897d1 100644 --- a/packages/neo4j-driver/test/driver.test.js +++ b/packages/neo4j-driver/test/driver.test.js @@ -170,7 +170,7 @@ describe('#integration driver', () => { // Given const config = { maxConnectionPoolSize: 2, - connectionAcquisitionTimeout: 0, + connectionAcquisitionTimeout: 1000, encrypted: false } driver = neo4j.driver( diff --git a/packages/neo4j-driver/test/internal/channel-config.test.js b/packages/neo4j-driver/test/internal/channel-config.test.js index 1d75b8c25..d15e863bc 100644 --- a/packages/neo4j-driver/test/internal/channel-config.test.js +++ b/packages/neo4j-driver/test/internal/channel-config.test.js @@ -111,28 +111,10 @@ describe('#unit ChannelConfig', () => { expect(config.connectionErrorCode).toEqual(SERVICE_UNAVAILABLE) }) - it('should have connection timeout by default', () => { + it('should have connection timeout being used as it is', () => { const config = new ChannelConfig(null, {}, '') - expect(config.connectionTimeout).toEqual(30000) - }) - - it('should respect configured connection timeout', () => { - const config = new ChannelConfig(null, { connectionTimeout: 424242 }, '') - - expect(config.connectionTimeout).toEqual(424242) - }) - - it('should respect disabled connection timeout with value zero', () => { - const config = new ChannelConfig(null, { connectionTimeout: 0 }, '') - - expect(config.connectionTimeout).toBeNull() - }) - - it('should respect disabled connection timeout with negative value', () => { - const config = new ChannelConfig(null, { connectionTimeout: -42 }, '') - - expect(config.connectionTimeout).toBeNull() + expect(config.connectionTimeout).toEqual(undefined) }) it('should validate value of "encrypted" property', () => { diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 33d0de405..a09efd98b 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -85,6 +85,9 @@ export function NewDriver (context, data, wire) { if ('connectionAcquisitionTimeoutMs' in data) { config.connectionAcquisitionTimeout = data.connectionAcquisitionTimeoutMs } + if ('connectionTimeoutMs' in data) { + config.connectionTimeout = data.connectionTimeoutMs + } if ('fetchSize' in data) { config.fetchSize = data.fetchSize } @@ -380,6 +383,7 @@ export function GetFeatures (_context, _params, wire) { 'Feature:Bolt:4.4', 'Feature:API:Result.List', 'Feature:API:Result.Peek', + 'Feature:Configuration:ConnectionAcquisitionTimeout', 'Optimization:EagerTransactionBegin', 'Optimization:ImplicitDefaultArguments', 'Optimization:PullPipelining',