diff --git a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js index a85809eff..b8143bfac 100644 --- a/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js +++ b/packages/bolt-connection/src/connection-provider/connection-provider-pooled.js @@ -16,8 +16,7 @@ */ import { createChannelConnection, ConnectionErrorHandler } from '../connection' -import Pool, { PoolConfig } from '../pool' -import { error, ConnectionProvider, ServerInfo, newError } from 'neo4j-driver-core' +import { error, ConnectionProvider, ServerInfo, newError, internal } from 'neo4j-driver-core' import AuthenticationProvider from './authentication-provider' import { object } from '../lang' import LivenessCheckProvider from './liveness-check-provider' @@ -31,6 +30,12 @@ const AUTHENTICATION_ERRORS = [ 'Neo.ClientError.Security.Unauthorized' ] +const { + pool: { + Pool, PoolConfig + } +} = internal + export default class PooledConnectionProvider extends ConnectionProvider { constructor ( { id, config, log, userAgent, boltAgent, authTokenManager, newPool = (...args) => new Pool(...args) }, diff --git a/packages/bolt-connection/src/index.js b/packages/bolt-connection/src/index.js index 0262225b6..d27358e1f 100644 --- a/packages/bolt-connection/src/index.js +++ b/packages/bolt-connection/src/index.js @@ -20,6 +20,5 @@ export * as bolt from './bolt' export * as buf from './buf' export * as channel from './channel' export * as packstream from './packstream' -export * as pool from './pool' export * from './connection-provider' diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js index ff2f10ee0..83cb42519 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-direct.test.js @@ -16,7 +16,6 @@ */ import DirectConnectionProvider from '../../src/connection-provider/connection-provider-direct' -import { Pool } from '../../src/pool' import { Connection, DelegateConnection } from '../../src/connection' import { authTokenManagers, internal, newError, ServerInfo, staticAuthTokenManager } from 'neo4j-driver-core' import AuthenticationProvider from '../../src/connection-provider/authentication-provider' @@ -25,7 +24,8 @@ import LivenessCheckProvider from '../../src/connection-provider/liveness-check- const { serverAddress: { ServerAddress }, - logger: { Logger } + logger: { Logger }, + pool: { Pool } } = internal describe('#unit DirectConnectionProvider', () => { diff --git a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js index c1eeeb794..297e87d78 100644 --- a/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js +++ b/packages/bolt-connection/test/connection-provider/connection-provider-routing.test.js @@ -27,7 +27,6 @@ import { authTokenManagers } from 'neo4j-driver-core' import { RoutingTable } from '../../src/rediscovery/' -import { Pool } from '../../src/pool' import SimpleHostNameResolver from '../../src/channel/browser/browser-host-name-resolver' import RoutingConnectionProvider from '../../src/connection-provider/connection-provider-routing' import { DelegateConnection, Connection } from '../../src/connection' @@ -37,7 +36,8 @@ import LivenessCheckProvider from '../../src/connection-provider/liveness-check- const { serverAddress: { ServerAddress }, - logger: { Logger } + logger: { Logger }, + pool: { Pool } } = internal const { SERVICE_UNAVAILABLE, SESSION_EXPIRED } = error diff --git a/packages/core/src/internal/index.ts b/packages/core/src/internal/index.ts index c07c301e2..92e99730e 100644 --- a/packages/core/src/internal/index.ts +++ b/packages/core/src/internal/index.ts @@ -29,6 +29,7 @@ import * as serverAddress from './server-address' import * as resolver from './resolver' import * as objectUtil from './object-util' import * as boltAgent from './bolt-agent/index' +import * as pool from './pool' export { util, @@ -44,5 +45,6 @@ export { serverAddress, resolver, objectUtil, - boltAgent + boltAgent, + pool } diff --git a/packages/bolt-connection/src/pool/index.js b/packages/core/src/internal/pool/index.ts similarity index 100% rename from packages/bolt-connection/src/pool/index.js rename to packages/core/src/internal/pool/index.ts diff --git a/packages/bolt-connection/src/pool/pool-config.js b/packages/core/src/internal/pool/pool-config.ts similarity index 67% rename from packages/bolt-connection/src/pool/pool-config.js rename to packages/core/src/internal/pool/pool-config.ts index a31e72076..b1f5e5d94 100644 --- a/packages/bolt-connection/src/pool/pool-config.js +++ b/packages/core/src/internal/pool/pool-config.ts @@ -19,7 +19,10 @@ const DEFAULT_MAX_SIZE = 100 const DEFAULT_ACQUISITION_TIMEOUT = 60 * 1000 // 60 seconds export default class PoolConfig { - constructor (maxSize, acquisitionTimeout) { + public readonly maxSize: number + public readonly acquisitionTimeout: number + + constructor (maxSize: number, acquisitionTimeout: number) { this.maxSize = valueOrDefault(maxSize, DEFAULT_MAX_SIZE) this.acquisitionTimeout = valueOrDefault( acquisitionTimeout, @@ -27,19 +30,18 @@ export default class PoolConfig { ) } - static defaultConfig () { + static defaultConfig (): PoolConfig { return new PoolConfig(DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT) } - static fromDriverConfig (config) { - const maxSizeConfigured = isConfigured(config.maxConnectionPoolSize) - const maxSize = maxSizeConfigured + static fromDriverConfig (config: { maxConnectionPoolSize?: number, connectionAcquisitionTimeout?: number }): PoolConfig { + const maxSize = isConfigured(config.maxConnectionPoolSize) ? config.maxConnectionPoolSize : DEFAULT_MAX_SIZE - const acquisitionTimeoutConfigured = isConfigured( + + const acquisitionTimeout = isConfigured( config.connectionAcquisitionTimeout ) - const acquisitionTimeout = acquisitionTimeoutConfigured ? config.connectionAcquisitionTimeout : DEFAULT_ACQUISITION_TIMEOUT @@ -47,12 +49,12 @@ export default class PoolConfig { } } -function valueOrDefault (value, defaultValue) { - return value === 0 || value ? value : defaultValue +function valueOrDefault (value: number | undefined, defaultValue: number): number { + return isConfigured(value) ? value : defaultValue } -function isConfigured (value) { - return value === 0 || value +function isConfigured (value?: number): value is number { + return value === 0 || value != null } export { DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT } diff --git a/packages/bolt-connection/src/pool/pool.js b/packages/core/src/internal/pool/pool.ts similarity index 65% rename from packages/bolt-connection/src/pool/pool.js rename to packages/core/src/internal/pool/pool.ts index 6d3bd38fb..9071dedee 100644 --- a/packages/bolt-connection/src/pool/pool.js +++ b/packages/core/src/internal/pool/pool.ts @@ -16,13 +16,46 @@ */ import PoolConfig from './pool-config' -import { newError, internal } from 'neo4j-driver-core' +import { newError } from '../../error' +import { Logger } from '../logger' +import { ServerAddress } from '../server-address' + +type Release = (address: ServerAddress, resource: R) => Promise +type Create = (acquisitionContext: unknown, address: ServerAddress, release: Release) => Promise +type Destroy = (resource: R) => Promise +type ValidateOnAcquire = (acquisitionContext: unknown, resource: R) => (Promise | boolean) +type ValidateOnRelease = (resource: R) => (Promise | boolean) +type InstallObserver = (resource: R, observer: unknown) => void +type RemoveObserver = (resource: R) => void +interface AcquisitionConfig { requireNew?: boolean } + +interface ConstructorParam { + create?: Create + destroy?: Destroy + validateOnAcquire?: ValidateOnAcquire + validateOnRelease?: ValidateOnRelease + installIdleObserver?: InstallObserver + removeIdleObserver?: RemoveObserver + config?: PoolConfig + log?: Logger +} -const { - logger: { Logger } -} = internal +class Pool { + private readonly _create: Create + private readonly _destroy: Destroy + private readonly _validateOnAcquire: ValidateOnAcquire + private readonly _validateOnRelease: ValidateOnRelease + private readonly _installIdleObserver: InstallObserver + private readonly _removeIdleObserver: RemoveObserver + private readonly _maxSize: number + private readonly _acquisitionTimeout: number + private readonly _log: Logger + private readonly _pools: { [key: string]: SingleAddressPool } + private readonly _pendingCreates: { [key: string]: number } + private readonly _acquireRequests: { [key: string]: Array> } + private readonly _activeResourceCounts: { [key: string]: number } + private _closed: boolean -class Pool { /** * @param {function(acquisitionContext: object, address: ServerAddress, function(address: ServerAddress, resource: object): Promise): Promise} create * an allocation function that creates a promise with a new resource. It's given an address for which to @@ -44,15 +77,15 @@ class Pool { * @param {Logger} log the driver logger. */ constructor ({ - create = (acquisitionContext, address, release) => Promise.resolve(), - destroy = conn => Promise.resolve(), + create = async (acquisitionContext, address, release) => await Promise.reject(new Error('Not implemented')), + destroy = async conn => await Promise.resolve(), validateOnAcquire = (acquisitionContext, conn) => true, validateOnRelease = (conn) => true, installIdleObserver = (conn, observer) => {}, removeIdleObserver = conn => {}, config = PoolConfig.defaultConfig(), log = Logger.noOp() - } = {}) { + }: ConstructorParam) { this._create = create this._destroy = destroy this._validateOnAcquire = validateOnAcquire @@ -78,25 +111,23 @@ class Pool { * @param {boolean} config.requireNew Indicate it requires a new resource * @return {Promise} resource that is ready to use. */ - acquire (acquisitionContext, address, config) { + async acquire (acquisitionContext: unknown, address: ServerAddress, config?: AcquisitionConfig): Promise { const key = address.asKey() // We're out of resources and will try to acquire later on when an existing resource is released. const allRequests = this._acquireRequests const requests = allRequests[key] - if (!requests) { + if (requests == null) { allRequests[key] = [] } - return new Promise((resolve, reject) => { - let request = null - + return await new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { // acquisition timeout fired // remove request from the queue of pending requests, if it's still there // request might've been taken out by the release operation const pendingRequests = allRequests[key] - if (pendingRequests) { + if (pendingRequests != null) { allRequests[key] = pendingRequests.filter(item => item !== request) } @@ -113,9 +144,14 @@ class Pool { ) } }, this._acquisitionTimeout) - typeof timeoutId === 'object' && timeoutId.unref() - request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) + if (typeof timeoutId === 'object') { + // eslint-disable-next-line + // @ts-ignore + timeoutId.unref() + } + + const request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) allRequests[key].push(request) this._processPendingAcquireRequests(address) }) @@ -126,11 +162,11 @@ class Pool { * @param {ServerAddress} address the address of the server to purge its pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - purge (address) { - return this._purgeKey(address.asKey()) + async purge (address: ServerAddress): Promise { + return await this._purgeKey(address.asKey()) } - apply (address, resourceConsumer) { + apply (address: ServerAddress, resourceConsumer: (resource: R) => void): void { const key = address.asKey() if (key in this._pools) { @@ -142,12 +178,12 @@ class Pool { * Destroy all idle resources in this pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - async close () { + async close (): Promise { this._closed = true /** * The lack of Promise consuming was making the driver do not close properly in the scenario * captured at result.test.js:it('should handle missing onCompleted'). The test was timing out - * because while wainting for the driver close. + * because while waiting for the driver close. * * Consuming the Promise.all or by calling then or by awaiting in the result inside this method solved * the issue somehow. @@ -156,20 +192,20 @@ class Pool { * seems to be need also. */ return await Promise.all( - Object.keys(this._pools).map(key => this._purgeKey(key)) - ) + Object.keys(this._pools).map(async key => await this._purgeKey(key)) + ).then() } /** * Keep the idle resources for the provided addresses and purge the rest. * @returns {Promise} A promise that is resolved when the other resources are purged */ - keepAll (addresses) { + async keepAll (addresses: ServerAddress[]): Promise { const keysToKeep = addresses.map(a => a.asKey()) const keysPresent = Object.keys(this._pools) - const keysToPurge = keysPresent.filter(k => keysToKeep.indexOf(k) === -1) + const keysToPurge = keysPresent.filter(k => !keysToKeep.includes(k)) - return Promise.all(keysToPurge.map(key => this._purgeKey(key))) + return await Promise.all(keysToPurge.map(async key => await this._purgeKey(key))).then() } /** @@ -177,7 +213,7 @@ class Pool { * @param {ServerAddress} address the address of the server to check. * @return {boolean} `true` when pool contains entries for the given key, false otherwise. */ - has (address) { + has (address: ServerAddress): boolean { return address.asKey() in this._pools } @@ -186,21 +222,21 @@ class Pool { * @param {ServerAddress} address the address of the server to check. * @return {number} count of resources acquired by clients. */ - activeResourceCount (address) { - return this._activeResourceCounts[address.asKey()] || 0 + activeResourceCount (address: ServerAddress): number { + return this._activeResourceCounts[address.asKey()] ?? 0 } - _getOrInitializePoolFor (key) { + _getOrInitializePoolFor (key: string): SingleAddressPool { let pool = this._pools[key] - if (!pool) { - pool = new SingleAddressPool() + if (pool == null) { + pool = new SingleAddressPool() this._pools[key] = pool this._pendingCreates[key] = 0 } return pool } - async _acquire (acquisitionContext, address, requireNew) { + async _acquire (acquisitionContext: unknown, address: ServerAddress, requireNew: boolean): Promise<{ resource: R | null, pool: SingleAddressPool }> { if (this._closed) { throw newError('Pool is closed, it is no more able to serve requests.') } @@ -208,10 +244,14 @@ class Pool { const key = address.asKey() const pool = this._getOrInitializePoolFor(key) if (!requireNew) { - while (pool.length) { + while (pool.length > 0) { const resource = pool.pop() - if (this._removeIdleObserver) { + if (resource == null) { + continue + } + + if (this._removeIdleObserver != null) { this._removeIdleObserver(resource) } @@ -219,6 +259,7 @@ class Pool { // idle resource is valid and can be acquired resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} acquired from the pool ${key}`) } return { resource, pool } @@ -249,18 +290,21 @@ class Pool { const numConnections = this.activeResourceCount(address) + pool.length if (numConnections >= this._maxSize && requireNew) { const resource = pool.pop() - if (this._removeIdleObserver) { - this._removeIdleObserver(resource) + if (resource != null) { + if (this._removeIdleObserver != null) { + this._removeIdleObserver(resource) + } + pool.removeInUse(resource) + await this._destroy(resource) } - pool.removeInUse(resource) - await this._destroy(resource) } // Invoke callback that creates actual connection - resource = await this._create(acquisitionContext, address, (address, resource) => this._release(address, resource, pool)) + resource = await this._create(acquisitionContext, address, async (address, resource) => await this._release(address, resource, pool)) pool.pushInUse(resource) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} created for the pool ${key}`) } } finally { @@ -269,7 +313,7 @@ class Pool { return { resource, pool } } - async _release (address, resource, pool) { + async _release (address: ServerAddress, resource: R, pool: SingleAddressPool): Promise { const key = address.asKey() try { @@ -278,20 +322,22 @@ class Pool { if (!await this._validateOnRelease(resource)) { if (this._log.isDebugEnabled()) { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `${resource} destroyed and can't be released to the pool ${key} because it is not functional` ) } pool.removeInUse(resource) await this._destroy(resource) } else { - if (this._installIdleObserver) { + if (this._installIdleObserver != null) { this._installIdleObserver(resource, { - onError: error => { + onError: (error: Error) => { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `Idle connection ${resource} destroyed because of error: ${error}` ) const pool = this._pools[key] - if (pool) { + if (pool != null) { this._pools[key] = pool.filter(r => r !== resource) pool.removeInUse(resource) } @@ -304,6 +350,7 @@ class Pool { } pool.push(resource) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} released to the pool ${key}`) } } @@ -311,6 +358,7 @@ class Pool { // key has been purged, don't put it back, just destroy the resource if (this._log.isDebugEnabled()) { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` ) } @@ -324,45 +372,58 @@ class Pool { } } - async _purgeKey (key) { + async _purgeKey (key: string): Promise { const pool = this._pools[key] const destructionList = [] - if (pool) { - while (pool.length) { + if (pool != null) { + while (pool.length > 0) { const resource = pool.pop() - if (this._removeIdleObserver) { + if (resource == null) { + continue + } + + if (this._removeIdleObserver != null) { this._removeIdleObserver(resource) } destructionList.push(this._destroy(resource)) } pool.close() + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete this._pools[key] await Promise.all(destructionList) } } - _processPendingAcquireRequests (address) { + _processPendingAcquireRequests (address: ServerAddress): void { const key = address.asKey() const requests = this._acquireRequests[key] - if (requests) { + if (requests != null) { const pendingRequest = requests.shift() // pop a pending acquire request - if (pendingRequest) { + if (pendingRequest != null) { this._acquire(pendingRequest.context, address, pendingRequest.requireNew) .catch(error => { // failed to acquire/create a new connection to resolve the pending acquire request // propagate the error by failing the pending request pendingRequest.reject(error) - return { resource: null } + return { resource: null, pool: null } }) .then(({ resource, pool }) => { - if (resource) { + // there is not situation where the pool resource is not null and the + // pool is null. + if (resource != null && pool != null) { // managed to acquire a valid resource from the pool if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool this._release(address, resource, pool) + .catch(error => { + if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this._log.debug(`${resource} could not be release back to the pool. Cause: ${error}`) + } + }) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -371,14 +432,15 @@ class Pool { // failed to acquire a valid resource from the pool // return the pending request back to the pool if (!pendingRequest.isCompleted()) { - if (!this._acquireRequests[key]) { + if (this._acquireRequests[key] == null) { this._acquireRequests[key] = [] } this._acquireRequests[key].unshift(pendingRequest) } } - }) + }).catch(error => pendingRequest.reject(error)) } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete this._acquireRequests[key] } } @@ -390,8 +452,8 @@ class Pool { * @param {string} key the resource group identifier (server address for connections). * @param {Object.} activeResourceCounts the object holding active counts per key. */ -function resourceAcquired (key, activeResourceCounts) { - const currentCount = activeResourceCounts[key] || 0 +function resourceAcquired (key: string, activeResourceCounts: { [key: string]: number }): void { + const currentCount = activeResourceCounts[key] ?? 0 activeResourceCounts[key] = currentCount + 1 } @@ -400,19 +462,29 @@ function resourceAcquired (key, activeResourceCounts) { * @param {string} key the resource group identifier (server address for connections). * @param {Object.} activeResourceCounts the object holding active counts per key. */ -function resourceReleased (key, activeResourceCounts) { - const currentCount = activeResourceCounts[key] || 0 +function resourceReleased (key: string, activeResourceCounts: { [key: string]: number }): void { + const currentCount = activeResourceCounts[key] ?? 0 const nextCount = currentCount - 1 if (nextCount > 0) { activeResourceCounts[key] = nextCount } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete activeResourceCounts[key] } } -class PendingRequest { - constructor (key, context, config, resolve, reject, timeoutId, log) { +class PendingRequest { + private readonly _key: string + private readonly _context: unknown + private readonly _config: AcquisitionConfig + private readonly _resolve: (resource: R) => void + private readonly _reject: (error: Error) => void + private readonly _timeoutId: any + private readonly _log: Logger + private _completed: boolean + + constructor (key: string, context: unknown, config: AcquisitionConfig | undefined, resolve: (resource: R) => void, reject: (error: Error) => void, timeoutId: any, log: Logger) { this._key = key this._context = context this._resolve = resolve @@ -420,22 +492,22 @@ class PendingRequest { this._timeoutId = timeoutId this._log = log this._completed = false - this._config = config || {} + this._config = config ?? {} } - get context () { + get context (): unknown { return this._context } - get requireNew () { - return this._config.requireNew || false + get requireNew (): boolean { + return this._config.requireNew ?? false } - isCompleted () { + isCompleted (): boolean { return this._completed } - resolve (resource) { + resolve (resource: R): void { if (this._completed) { return } @@ -443,12 +515,13 @@ class PendingRequest { clearTimeout(this._timeoutId) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} acquired from the pool ${this._key}`) } this._resolve(resource) } - reject (error) { + reject (error: Error): void { if (this._completed) { return } @@ -459,53 +532,59 @@ class PendingRequest { } } -class SingleAddressPool { +class SingleAddressPool { + private _active: boolean + private _elements: R[] + private _elementsInUse: Set + constructor () { this._active = true this._elements = [] this._elementsInUse = new Set() } - isActive () { + isActive (): boolean { return this._active } - close () { + close (): void { this._active = false this._elements = [] this._elementsInUse = new Set() } - filter (predicate) { + filter (predicate: (resource: R) => boolean): SingleAddressPool { this._elements = this._elements.filter(predicate) return this } - apply (resourceConsumer) { + apply (resourceConsumer: (resource: R) => void): void { this._elements.forEach(resourceConsumer) this._elementsInUse.forEach(resourceConsumer) } - get length () { + get length (): number { return this._elements.length } - pop () { + pop (): R | undefined { const element = this._elements.pop() - this._elementsInUse.add(element) + if (element != null) { + this._elementsInUse.add(element) + } return element } - push (element) { + push (element: R): number { this._elementsInUse.delete(element) return this._elements.push(element) } - pushInUse (element) { + pushInUse (element: R): void { this._elementsInUse.add(element) } - removeInUse (element) { + removeInUse (element: R): void { this._elementsInUse.delete(element) } } diff --git a/packages/bolt-connection/test/pool/pool.test.js b/packages/core/test/internal/pool/pool.test.ts similarity index 75% rename from packages/bolt-connection/test/pool/pool.test.js rename to packages/core/test/internal/pool/pool.test.ts index 581b7f608..60d6e01cb 100644 --- a/packages/bolt-connection/test/pool/pool.test.js +++ b/packages/core/test/internal/pool/pool.test.ts @@ -15,13 +15,10 @@ * limitations under the License. */ -import Pool from '../../src/pool/pool' -import PoolConfig from '../../src/pool/pool-config' -import { newError, error, internal } from 'neo4j-driver-core' - -const { - serverAddress: { ServerAddress } -} = internal +import Pool from '../../../src/internal/pool/pool' +import PoolConfig from '../../../src/internal/pool/pool-config' +import { ServerAddress } from '../../../src/internal/server-address' +import { newError, error } from '../../../src' const { SERVICE_UNAVAILABLE } = error @@ -30,9 +27,9 @@ describe('#unit Pool', () => { // Given let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_: unknown, server: ServerAddress, release: (address: ServerAddress, resource: Resource) => Promise) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -49,9 +46,9 @@ describe('#unit Pool', () => { // Given a pool that allocates let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -71,9 +68,9 @@ describe('#unit Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -97,14 +94,14 @@ describe('#unit Pool', () => { it('frees if validate returns false', async () => { // Given a pool that allocates let counter = 0 - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, validateOnRelease: res => false, config: new PoolConfig(1000, 60000) @@ -126,14 +123,14 @@ describe('#unit Pool', () => { it('should release resources and process acquisitions when destroy connection', async () => { // Given a pool that allocates let counter = 0 - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, validateOnRelease: res => false, config: new PoolConfig(2, 10000) @@ -172,14 +169,14 @@ describe('#unit Pool', () => { // Given a pool that allocates let counter = 0 const theMadeUpError = new Error('I made this error for testing') - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.reject(theMadeUpError) + return await Promise.reject(theMadeUpError) }, validateOnRelease: res => false, config: new PoolConfig(2, 3000) @@ -218,14 +215,14 @@ describe('#unit Pool', () => { // Given a pool that allocates let counter = 0 const theMadeUpError = new Error('I made this error for testing') - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.reject(theMadeUpError) + return await Promise.reject(theMadeUpError) }, validateOnRelease: res => true, config: new PoolConfig(2, 3000) @@ -266,16 +263,16 @@ describe('#unit Pool', () => { it('frees if validateOnRelease returns Promise.resolve(false)', async () => { // Given a pool that allocates let counter = 0 - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, - validateOnRelease: res => Promise.resolve(false), + validateOnRelease: async res => await Promise.resolve(false), config: new PoolConfig(1000, 60000) }) @@ -297,14 +294,14 @@ describe('#unit Pool', () => { let counter = 0 const destroyed = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, - validateOnRelease: res => Promise.resolve(true), + validateOnRelease: async res => await Promise.resolve(true), config: new PoolConfig(1000, 60000) }) @@ -322,16 +319,16 @@ describe('#unit Pool', () => { it('frees if validateOnAcquire returns Promise.resolve(false)', async () => { // Given a pool that allocates let counter = 0 - const destroyed = [] + const destroyed: Resource[] = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, - validateOnAcquire: res => Promise.resolve(false), + validateOnAcquire: async res => await Promise.resolve(false), config: new PoolConfig(1000, 60000) }) @@ -357,14 +354,14 @@ describe('#unit Pool', () => { let counter = 0 const destroyed = [] const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_acquisitionContext, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_acquisitionContext, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { destroyed.push(res) - return Promise.resolve() + return await Promise.resolve() }, - validateOnAcquire: res => Promise.resolve(true), + validateOnAcquire: async res => await Promise.resolve(true), config: new PoolConfig(1000, 60000) }) @@ -388,12 +385,12 @@ describe('#unit Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -428,12 +425,12 @@ describe('#unit Pool', () => { let counter = 0 const address1 = ServerAddress.fromUrl('bolt://localhost:7687') const address2 = ServerAddress.fromUrl('bolt://localhost:7688') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -473,12 +470,12 @@ describe('#unit Pool', () => { it('destroys resource when key was purged', async () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -498,12 +495,12 @@ describe('#unit Pool', () => { it('destroys resource when pool is purged even if a new pool is created for the same address', async () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -540,12 +537,12 @@ describe('#unit Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -558,7 +555,7 @@ describe('#unit Pool', () => { pool.acquire({}, address3) ] const values = await Promise.all(acquiredResources) - await Promise.all(values.map(resource => resource.close())) + await Promise.all(values.map(async resource => await resource.close())) await pool.close() @@ -568,10 +565,10 @@ describe('#unit Pool', () => { it('should fail to acquire when closed', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 0, release)), - destroy: res => { - return Promise.resolve() + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 0, release)), + destroy: async res => { + return await Promise.resolve() } }) @@ -586,11 +583,11 @@ describe('#unit Pool', () => { it('should fail to acquire when closed with idle connections', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 0, release)), - destroy: res => { - return Promise.resolve() + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 0, release)), + destroy: async res => { + return await Promise.resolve() } }) @@ -613,12 +610,12 @@ describe('#unit Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -650,12 +647,12 @@ describe('#unit Pool', () => { const address2 = ServerAddress.fromUrl('bolt://localhost:7688') const address3 = ServerAddress.fromUrl('bolt://localhost:7689') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() } }) @@ -684,15 +681,15 @@ describe('#unit Pool', () => { let validated = true let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => { + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => { res.destroyed = true - return Promise.resolve() + return await Promise.resolve() }, - validateOnAcquire: (context, _res) => { - if (context.triggerValidation) { + validateOnAcquire: (context: any, _res) => { + if (context.triggerValidation === true) { validated = !validated return validated } @@ -712,8 +709,8 @@ describe('#unit Pool', () => { const absentAddress = ServerAddress.fromUrl('bolt://localhost:7688') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) await pool.acquire({}, existingAddress) @@ -725,8 +722,8 @@ describe('#unit Pool', () => { it('reports zero active resources when empty', () => { const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) expect( @@ -743,8 +740,8 @@ describe('#unit Pool', () => { it('reports active resources', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) const acquiredResources = [ @@ -761,16 +758,16 @@ describe('#unit Pool', () => { it('reports active resources when they are acquired', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) // three new resources are created and returned to the pool const r0 = await pool.acquire({}, address) const r1 = await pool.acquire({}, address) const r2 = await pool.acquire({}, address) - await [r0, r1, r2].map(v => v.close()) + await [r0, r1, r2].map(async v => await v.close()) // three idle resources are acquired from the pool const acquiredResources = [ @@ -789,9 +786,9 @@ describe('#unit Pool', () => { it('does not report resources that are returned to the pool', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 42, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 42, release)) }) const r0 = await pool.acquire({}, address) @@ -819,10 +816,10 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, 5000) }) @@ -831,7 +828,7 @@ describe('#unit Pool', () => { setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - r1.close() + ignore(r1.close()) }, 1000) const r2 = await pool.acquire({}, address) @@ -843,9 +840,9 @@ describe('#unit Pool', () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, 1000) }) @@ -861,14 +858,14 @@ describe('#unit Pool', () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') it('should consider pending connects when evaluating max pool size', async () => { - const conns = [] + const conns: any[] = [] const pool = new Pool({ // Hook into connection creation to track when and what connections that are // created. - create: (_, server, release) => { + create: async (_, server, release) => { // Create a fake connection that makes it possible control when it's connected // and released from the outer scope. - const conn = { + const conn: any = { server, release } @@ -879,7 +876,7 @@ describe('#unit Pool', () => { // Put the connection in a list in outer scope even though there only should be // one when the test is succeeding. conns.push(conn) - return promise + return await promise }, // Setup pool to only allow one connection config: new PoolConfig(1, 100000) @@ -911,10 +908,10 @@ describe('#unit Pool', () => { it('should not time out if max pool size is not set', async () => { let counter = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve() + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve() }) await pool.acquire({}, address) @@ -930,10 +927,10 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, acquisitionTimeout) }) @@ -947,8 +944,8 @@ describe('#unit Pool', () => { // double-release used to cause deletion of acquire requests in the pool and failure of the timeout // such background failure made this test fail, not the existing assertions setTimeout(() => { - resource1.close() - resource2.close() + ignore(resource1.close()) + ignore(resource2.close()) }, acquisitionTimeout) // Remember that both code paths are ok with this test, either a success with a valid resource @@ -972,10 +969,10 @@ describe('#unit Pool', () => { const acquisitionTimeout = 1000 let counter = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), validateOnAcquire: (_, res) => resourceValidOnlyOnceValidationFunction(res), validateOnRelease: resourceValidOnlyOnceValidationFunction, config: new PoolConfig(1, acquisitionTimeout) @@ -988,7 +985,7 @@ describe('#unit Pool', () => { // release the resource before the acquisition timeout, it should be treated as invalid setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - resource1.close() + ignore(resource1.close()) }, acquisitionTimeout / 2) const resource2 = await pool.acquire({}, address) @@ -1002,10 +999,10 @@ describe('#unit Pool', () => { const acquisitionTimeout = 1000 let counter = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), validateOnAcquire: (_, res) => resourceValidOnlyOnceValidationFunction(res), validateOnRelease: resourceValidOnlyOnceValidationFunction, config: new PoolConfig(2, acquisitionTimeout) @@ -1022,8 +1019,8 @@ describe('#unit Pool', () => { // release both resources before the acquisition timeout, they should be treated as invalid setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - resource1.close() - resource2.close() + ignore(resource1.close()) + ignore(resource2.close()) }, acquisitionTimeout / 2) const resource3 = await pool.acquire({}, address) @@ -1038,10 +1035,10 @@ describe('#unit Pool', () => { let installIdleObserverCount = 0 let removeIdleObserverCount = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, resourceCount++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: async res => await Promise.resolve(), installIdleObserver: (resource, observer) => { installIdleObserverCount++ }, @@ -1053,7 +1050,7 @@ describe('#unit Pool', () => { const r1 = await pool.acquire({}, address) const r2 = await pool.acquire({}, address) const r3 = await pool.acquire({}, address) - await [r1, r2, r3].map(r => r.close()) + await [r1, r2, r3].map(async r => await r.close()) expect(installIdleObserverCount).toEqual(3) expect(removeIdleObserverCount).toEqual(0) @@ -1070,10 +1067,10 @@ describe('#unit Pool', () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') let resourceCount = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, resourceCount++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: async res => await Promise.resolve(), installIdleObserver: (resource, observer) => { resource.observer = observer }, @@ -1109,10 +1106,10 @@ describe('#unit Pool', () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') let resourceCount = 0 - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, resourceCount++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: async res => await Promise.resolve(), installIdleObserver: (resource, observer) => { resource.observer = observer }, @@ -1137,12 +1134,12 @@ describe('#unit Pool', () => { const acquisitionTimeout = 1000 let counter = 0 - const pool = new Pool({ - create: (_, server, release) => - new Promise(resolve => setTimeout( + const pool = new Pool({ + create: async (_, server, release) => + await new Promise(resolve => setTimeout( () => resolve(new Resource(server, counter++, release)) , acquisitionTimeout + 10)), - destroy: res => Promise.resolve(), + destroy: async res => await Promise.resolve(), validateOnAcquire: (_, res) => resourceValidOnlyOnceValidationFunction(res), validateOnRelease: resourceValidOnlyOnceValidationFunction, config: new PoolConfig(1, acquisitionTimeout) @@ -1170,24 +1167,24 @@ describe('#unit Pool', () => { it('should purge resources in parallel', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') let resourceCount = 0 - const resourcesReleased = [] - let resolveRelease + const resourcesReleased: Resource[] = [] + let resolveRelease: (r: Resource) => void const releasePromise = new Promise((resolve) => { resolveRelease = resolve }) const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, resourceCount++, release)), - destroy: res => { + create: async (_, server, release) => + await Promise.resolve(new Resource(server, resourceCount++, release)), + destroy: async (res: Resource) => { resourcesReleased.push(res) resourceCount-- // Only destroy when the last resource // get destroyed if (resourceCount === 0) { - resolveRelease() + resolveRelease(res) } - return releasePromise + return await releasePromise.then() } }) @@ -1208,9 +1205,9 @@ describe('#unit Pool', () => { // Given let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -1227,9 +1224,9 @@ describe('#unit Pool', () => { // Given a pool that allocates let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)) + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)) }) // When @@ -1247,10 +1244,10 @@ describe('#unit Pool', () => { it('should fail to acquire when closed', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 0, release)), - destroy: res => { - return Promise.resolve() + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 0, release)), + destroy: async res => { + return await Promise.resolve() } }) @@ -1265,11 +1262,11 @@ describe('#unit Pool', () => { it('should fail to acquire when closed with idle connections', async () => { const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, 0, release)), - destroy: res => { - return Promise.resolve() + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, 0, release)), + destroy: async res => { + return await Promise.resolve() } }) @@ -1289,10 +1286,10 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, 5000) }) @@ -1301,7 +1298,7 @@ describe('#unit Pool', () => { setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - r1.close() + ignore(r1.close()) }, 1000) expect(r1).not.toBe(r0) @@ -1313,10 +1310,10 @@ describe('#unit Pool', () => { let counter = 0 const address = ServerAddress.fromUrl('bolt://localhost:7687') - const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), - destroy: res => Promise.resolve(), + const pool = new Pool({ + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), + destroy: async res => await Promise.resolve(), config: new PoolConfig(2, 5000) }) @@ -1325,7 +1322,7 @@ describe('#unit Pool', () => { setTimeout(() => { expectNumberOfAcquisitionRequests(pool, address, 1) - r1.close() + ignore(r1.close()) }, 1000) expect(r1).not.toBe(r0) @@ -1336,12 +1333,12 @@ describe('#unit Pool', () => { it('should handle a sequence of request new and the regular request', async () => { let counter = 0 - const destroy = jest.fn(res => Promise.resolve()) + const destroy = jest.fn(async res => await Promise.resolve()) const removeIdleObserver = jest.fn(res => undefined) const address = ServerAddress.fromUrl('bolt://localhost:7687') const pool = new Pool({ - create: (_, server, release) => - Promise.resolve(new Resource(server, counter++, release)), + create: async (_, server, release) => + await Promise.resolve(new Resource(server, counter++, release)), destroy, removeIdleObserver, config: new PoolConfig(1, 5000) @@ -1394,9 +1391,10 @@ describe('#unit Pool', () => { }) }) -function expectNoPendingAcquisitionRequests (pool) { +function expectNoPendingAcquisitionRequests (pool: Pool): void { + // @ts-expect-error const acquireRequests = pool._acquireRequests - Object.values(acquireRequests).forEach(requests => { + Object.values(acquireRequests).forEach((requests: any) => { if (Array.isArray(requests) && requests.length === 0) { requests = undefined } @@ -1404,33 +1402,37 @@ function expectNoPendingAcquisitionRequests (pool) { }) } -function expectNoIdleResources (pool, address) { +function expectNoIdleResources (pool: Pool, address: ServerAddress): void { if (pool.has(address)) { + // @ts-expect-error expect(pool._pools[address.asKey()].length).toBe(0) } } -function idleResources (pool, address) { +function idleResources (pool: Pool, address: ServerAddress): number | undefined { if (pool.has(address)) { + // @ts-expect-error return pool._pools[address.asKey()].length } return undefined } -function resourceInUse (pool, address) { +function resourceInUse (pool: Pool, address: ServerAddress): number | undefined { if (pool.has(address)) { + // @ts-expect-error return pool._pools[address.asKey()]._elementsInUse.size } return undefined } -function expectNumberOfAcquisitionRequests (pool, address, expectedNumber) { +function expectNumberOfAcquisitionRequests (pool: Pool, address: ServerAddress, expectedNumber: number): void { + // @ts-expect-error expect(pool._acquireRequests[address.asKey()].length).toEqual(expectedNumber) } -function resourceValidOnlyOnceValidationFunction (resource) { +function resourceValidOnlyOnceValidationFunction (resource: Resource): boolean { // all resources are valid only once - if (resource.validatedOnce) { + if (resource.validatedOnce === true) { return false } else { resource.validatedOnce = true @@ -1438,15 +1440,24 @@ function resourceValidOnlyOnceValidationFunction (resource) { } } +function ignore (value: T | Promise): void { + Promise.resolve(value).catch(e => console.error('Error ignore, should not happen', e)) +} + class Resource { - constructor (key, id, release) { - this.id = id - this.key = key + public destroyed: boolean + public observer?: any + public validatedOnce?: boolean + + constructor ( + public key: ServerAddress, + public id: number, + public release: (key: ServerAddress, r: Resource) => (Promise | void)) { this.release = release this.destroyed = false } - close () { + close (): Promise | void { return this.release(this.key, this) } } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js index 04ca8bf66..53902b722 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/connection-provider/connection-provider-pooled.js @@ -16,8 +16,7 @@ */ import { createChannelConnection, ConnectionErrorHandler } from '../connection/index.js' -import Pool, { PoolConfig } from '../pool/index.js' -import { error, ConnectionProvider, ServerInfo, newError } from '../../core/index.ts' +import { error, ConnectionProvider, ServerInfo, newError, internal } from '../../core/index.ts' import AuthenticationProvider from './authentication-provider.js' import { object } from '../lang/index.js' import LivenessCheckProvider from './liveness-check-provider.js' @@ -31,6 +30,12 @@ const AUTHENTICATION_ERRORS = [ 'Neo.ClientError.Security.Unauthorized' ] +const { + pool: { + Pool, PoolConfig + } +} = internal + export default class PooledConnectionProvider extends ConnectionProvider { constructor ( { id, config, log, userAgent, boltAgent, authTokenManager, newPool = (...args) => new Pool(...args) }, diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/index.js b/packages/neo4j-driver-deno/lib/bolt-connection/index.js index 676118737..fbb9783a5 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/index.js +++ b/packages/neo4j-driver-deno/lib/bolt-connection/index.js @@ -20,6 +20,5 @@ export * as bolt from './bolt/index.js' export * as buf from './buf/index.js' export * as channel from './channel/index.js' export * as packstream from './packstream/index.js' -export * as pool from './pool/index.js' export * from './connection-provider/index.js' diff --git a/packages/neo4j-driver-deno/lib/core/internal/index.ts b/packages/neo4j-driver-deno/lib/core/internal/index.ts index 283de1eec..771082496 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/index.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/index.ts @@ -29,6 +29,7 @@ import * as serverAddress from './server-address.ts' import * as resolver from './resolver/index.ts' import * as objectUtil from './object-util.ts' import * as boltAgent from './bolt-agent/index.ts' +import * as pool from './pool/index.ts' export { util, @@ -44,5 +45,6 @@ export { serverAddress, resolver, objectUtil, - boltAgent + boltAgent, + pool } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/pool/index.js b/packages/neo4j-driver-deno/lib/core/internal/pool/index.ts similarity index 93% rename from packages/neo4j-driver-deno/lib/bolt-connection/pool/index.js rename to packages/neo4j-driver-deno/lib/core/internal/pool/index.ts index 3329abda8..b9d918cfd 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/pool/index.js +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/index.ts @@ -18,8 +18,8 @@ import PoolConfig, { DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE -} from './pool-config.js' -import Pool from './pool.js' +} from './pool-config.ts' +import Pool from './pool.ts' export default Pool export { Pool, PoolConfig, DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool-config.js b/packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts similarity index 67% rename from packages/neo4j-driver-deno/lib/bolt-connection/pool/pool-config.js rename to packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts index a31e72076..b1f5e5d94 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool-config.js +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/pool-config.ts @@ -19,7 +19,10 @@ const DEFAULT_MAX_SIZE = 100 const DEFAULT_ACQUISITION_TIMEOUT = 60 * 1000 // 60 seconds export default class PoolConfig { - constructor (maxSize, acquisitionTimeout) { + public readonly maxSize: number + public readonly acquisitionTimeout: number + + constructor (maxSize: number, acquisitionTimeout: number) { this.maxSize = valueOrDefault(maxSize, DEFAULT_MAX_SIZE) this.acquisitionTimeout = valueOrDefault( acquisitionTimeout, @@ -27,19 +30,18 @@ export default class PoolConfig { ) } - static defaultConfig () { + static defaultConfig (): PoolConfig { return new PoolConfig(DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT) } - static fromDriverConfig (config) { - const maxSizeConfigured = isConfigured(config.maxConnectionPoolSize) - const maxSize = maxSizeConfigured + static fromDriverConfig (config: { maxConnectionPoolSize?: number, connectionAcquisitionTimeout?: number }): PoolConfig { + const maxSize = isConfigured(config.maxConnectionPoolSize) ? config.maxConnectionPoolSize : DEFAULT_MAX_SIZE - const acquisitionTimeoutConfigured = isConfigured( + + const acquisitionTimeout = isConfigured( config.connectionAcquisitionTimeout ) - const acquisitionTimeout = acquisitionTimeoutConfigured ? config.connectionAcquisitionTimeout : DEFAULT_ACQUISITION_TIMEOUT @@ -47,12 +49,12 @@ export default class PoolConfig { } } -function valueOrDefault (value, defaultValue) { - return value === 0 || value ? value : defaultValue +function valueOrDefault (value: number | undefined, defaultValue: number): number { + return isConfigured(value) ? value : defaultValue } -function isConfigured (value) { - return value === 0 || value +function isConfigured (value?: number): value is number { + return value === 0 || value != null } export { DEFAULT_MAX_SIZE, DEFAULT_ACQUISITION_TIMEOUT } diff --git a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts similarity index 65% rename from packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js rename to packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts index 072fc47e3..e657c26e4 100644 --- a/packages/neo4j-driver-deno/lib/bolt-connection/pool/pool.js +++ b/packages/neo4j-driver-deno/lib/core/internal/pool/pool.ts @@ -15,14 +15,47 @@ * limitations under the License. */ -import PoolConfig from './pool-config.js' -import { newError, internal } from '../../core/index.ts' +import PoolConfig from './pool-config.ts' +import { newError } from '../../error.ts' +import { Logger } from '../logger.ts' +import { ServerAddress } from '../server-address.ts' + +type Release = (address: ServerAddress, resource: R) => Promise +type Create = (acquisitionContext: unknown, address: ServerAddress, release: Release) => Promise +type Destroy = (resource: R) => Promise +type ValidateOnAcquire = (acquisitionContext: unknown, resource: R) => (Promise | boolean) +type ValidateOnRelease = (resource: R) => (Promise | boolean) +type InstallObserver = (resource: R, observer: unknown) => void +type RemoveObserver = (resource: R) => void +interface AcquisitionConfig { requireNew?: boolean } + +interface ConstructorParam { + create?: Create + destroy?: Destroy + validateOnAcquire?: ValidateOnAcquire + validateOnRelease?: ValidateOnRelease + installIdleObserver?: InstallObserver + removeIdleObserver?: RemoveObserver + config?: PoolConfig + log?: Logger +} -const { - logger: { Logger } -} = internal +class Pool { + private readonly _create: Create + private readonly _destroy: Destroy + private readonly _validateOnAcquire: ValidateOnAcquire + private readonly _validateOnRelease: ValidateOnRelease + private readonly _installIdleObserver: InstallObserver + private readonly _removeIdleObserver: RemoveObserver + private readonly _maxSize: number + private readonly _acquisitionTimeout: number + private readonly _log: Logger + private readonly _pools: { [key: string]: SingleAddressPool } + private readonly _pendingCreates: { [key: string]: number } + private readonly _acquireRequests: { [key: string]: Array> } + private readonly _activeResourceCounts: { [key: string]: number } + private _closed: boolean -class Pool { /** * @param {function(acquisitionContext: object, address: ServerAddress, function(address: ServerAddress, resource: object): Promise): Promise} create * an allocation function that creates a promise with a new resource. It's given an address for which to @@ -44,15 +77,15 @@ class Pool { * @param {Logger} log the driver logger. */ constructor ({ - create = (acquisitionContext, address, release) => Promise.resolve(), - destroy = conn => Promise.resolve(), + create = async (acquisitionContext, address, release) => await Promise.reject(new Error('Not implemented')), + destroy = async conn => await Promise.resolve(), validateOnAcquire = (acquisitionContext, conn) => true, validateOnRelease = (conn) => true, installIdleObserver = (conn, observer) => {}, removeIdleObserver = conn => {}, config = PoolConfig.defaultConfig(), log = Logger.noOp() - } = {}) { + }: ConstructorParam) { this._create = create this._destroy = destroy this._validateOnAcquire = validateOnAcquire @@ -78,25 +111,23 @@ class Pool { * @param {boolean} config.requireNew Indicate it requires a new resource * @return {Promise} resource that is ready to use. */ - acquire (acquisitionContext, address, config) { + async acquire (acquisitionContext: unknown, address: ServerAddress, config?: AcquisitionConfig): Promise { const key = address.asKey() // We're out of resources and will try to acquire later on when an existing resource is released. const allRequests = this._acquireRequests const requests = allRequests[key] - if (!requests) { + if (requests == null) { allRequests[key] = [] } - return new Promise((resolve, reject) => { - let request = null - + return await new Promise((resolve, reject) => { const timeoutId = setTimeout(() => { // acquisition timeout fired // remove request from the queue of pending requests, if it's still there // request might've been taken out by the release operation const pendingRequests = allRequests[key] - if (pendingRequests) { + if (pendingRequests != null) { allRequests[key] = pendingRequests.filter(item => item !== request) } @@ -113,9 +144,14 @@ class Pool { ) } }, this._acquisitionTimeout) - typeof timeoutId === 'object' && timeoutId.unref() - request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) + if (typeof timeoutId === 'object') { + // eslint-disable-next-line + // @ts-ignore + timeoutId.unref() + } + + const request = new PendingRequest(key, acquisitionContext, config, resolve, reject, timeoutId, this._log) allRequests[key].push(request) this._processPendingAcquireRequests(address) }) @@ -126,11 +162,11 @@ class Pool { * @param {ServerAddress} address the address of the server to purge its pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - purge (address) { - return this._purgeKey(address.asKey()) + async purge (address: ServerAddress): Promise { + return await this._purgeKey(address.asKey()) } - apply (address, resourceConsumer) { + apply (address: ServerAddress, resourceConsumer: (resource: R) => void): void { const key = address.asKey() if (key in this._pools) { @@ -142,12 +178,12 @@ class Pool { * Destroy all idle resources in this pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - async close () { + async close (): Promise { this._closed = true /** * The lack of Promise consuming was making the driver do not close properly in the scenario * captured at result.test.js:it('should handle missing onCompleted'). The test was timing out - * because while wainting for the driver close. + * because while waiting for the driver close. * * Consuming the Promise.all or by calling then or by awaiting in the result inside this method solved * the issue somehow. @@ -156,20 +192,20 @@ class Pool { * seems to be need also. */ return await Promise.all( - Object.keys(this._pools).map(key => this._purgeKey(key)) - ) + Object.keys(this._pools).map(async key => await this._purgeKey(key)) + ).then() } /** * Keep the idle resources for the provided addresses and purge the rest. * @returns {Promise} A promise that is resolved when the other resources are purged */ - keepAll (addresses) { + async keepAll (addresses: ServerAddress[]): Promise { const keysToKeep = addresses.map(a => a.asKey()) const keysPresent = Object.keys(this._pools) - const keysToPurge = keysPresent.filter(k => keysToKeep.indexOf(k) === -1) + const keysToPurge = keysPresent.filter(k => !keysToKeep.includes(k)) - return Promise.all(keysToPurge.map(key => this._purgeKey(key))) + return await Promise.all(keysToPurge.map(async key => await this._purgeKey(key))).then() } /** @@ -177,7 +213,7 @@ class Pool { * @param {ServerAddress} address the address of the server to check. * @return {boolean} `true` when pool contains entries for the given key, false otherwise. */ - has (address) { + has (address: ServerAddress): boolean { return address.asKey() in this._pools } @@ -186,21 +222,21 @@ class Pool { * @param {ServerAddress} address the address of the server to check. * @return {number} count of resources acquired by clients. */ - activeResourceCount (address) { - return this._activeResourceCounts[address.asKey()] || 0 + activeResourceCount (address: ServerAddress): number { + return this._activeResourceCounts[address.asKey()] ?? 0 } - _getOrInitializePoolFor (key) { + _getOrInitializePoolFor (key: string): SingleAddressPool { let pool = this._pools[key] - if (!pool) { - pool = new SingleAddressPool() + if (pool == null) { + pool = new SingleAddressPool() this._pools[key] = pool this._pendingCreates[key] = 0 } return pool } - async _acquire (acquisitionContext, address, requireNew) { + async _acquire (acquisitionContext: unknown, address: ServerAddress, requireNew: boolean): Promise<{ resource: R | null, pool: SingleAddressPool }> { if (this._closed) { throw newError('Pool is closed, it is no more able to serve requests.') } @@ -208,10 +244,14 @@ class Pool { const key = address.asKey() const pool = this._getOrInitializePoolFor(key) if (!requireNew) { - while (pool.length) { + while (pool.length > 0) { const resource = pool.pop() - if (this._removeIdleObserver) { + if (resource == null) { + continue + } + + if (this._removeIdleObserver != null) { this._removeIdleObserver(resource) } @@ -219,6 +259,7 @@ class Pool { // idle resource is valid and can be acquired resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} acquired from the pool ${key}`) } return { resource, pool } @@ -249,18 +290,21 @@ class Pool { const numConnections = this.activeResourceCount(address) + pool.length if (numConnections >= this._maxSize && requireNew) { const resource = pool.pop() - if (this._removeIdleObserver) { - this._removeIdleObserver(resource) + if (resource != null) { + if (this._removeIdleObserver != null) { + this._removeIdleObserver(resource) + } + pool.removeInUse(resource) + await this._destroy(resource) } - pool.removeInUse(resource) - await this._destroy(resource) } // Invoke callback that creates actual connection - resource = await this._create(acquisitionContext, address, (address, resource) => this._release(address, resource, pool)) + resource = await this._create(acquisitionContext, address, async (address, resource) => await this._release(address, resource, pool)) pool.pushInUse(resource) resourceAcquired(key, this._activeResourceCounts) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} created for the pool ${key}`) } } finally { @@ -269,7 +313,7 @@ class Pool { return { resource, pool } } - async _release (address, resource, pool) { + async _release (address: ServerAddress, resource: R, pool: SingleAddressPool): Promise { const key = address.asKey() try { @@ -278,20 +322,22 @@ class Pool { if (!await this._validateOnRelease(resource)) { if (this._log.isDebugEnabled()) { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `${resource} destroyed and can't be released to the pool ${key} because it is not functional` ) } pool.removeInUse(resource) await this._destroy(resource) } else { - if (this._installIdleObserver) { + if (this._installIdleObserver != null) { this._installIdleObserver(resource, { - onError: error => { + onError: (error: Error) => { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `Idle connection ${resource} destroyed because of error: ${error}` ) const pool = this._pools[key] - if (pool) { + if (pool != null) { this._pools[key] = pool.filter(r => r !== resource) pool.removeInUse(resource) } @@ -304,6 +350,7 @@ class Pool { } pool.push(resource) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} released to the pool ${key}`) } } @@ -311,6 +358,7 @@ class Pool { // key has been purged, don't put it back, just destroy the resource if (this._log.isDebugEnabled()) { this._log.debug( + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions `${resource} destroyed and can't be released to the pool ${key} because pool has been purged` ) } @@ -324,45 +372,58 @@ class Pool { } } - async _purgeKey (key) { + async _purgeKey (key: string): Promise { const pool = this._pools[key] const destructionList = [] - if (pool) { - while (pool.length) { + if (pool != null) { + while (pool.length > 0) { const resource = pool.pop() - if (this._removeIdleObserver) { + if (resource == null) { + continue + } + + if (this._removeIdleObserver != null) { this._removeIdleObserver(resource) } destructionList.push(this._destroy(resource)) } pool.close() + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete this._pools[key] await Promise.all(destructionList) } } - _processPendingAcquireRequests (address) { + _processPendingAcquireRequests (address: ServerAddress): void { const key = address.asKey() const requests = this._acquireRequests[key] - if (requests) { + if (requests != null) { const pendingRequest = requests.shift() // pop a pending acquire request - if (pendingRequest) { + if (pendingRequest != null) { this._acquire(pendingRequest.context, address, pendingRequest.requireNew) .catch(error => { // failed to acquire/create a new connection to resolve the pending acquire request // propagate the error by failing the pending request pendingRequest.reject(error) - return { resource: null } + return { resource: null, pool: null } }) .then(({ resource, pool }) => { - if (resource) { + // there is not situation where the pool resource is not null and the + // pool is null. + if (resource != null && pool != null) { // managed to acquire a valid resource from the pool if (pendingRequest.isCompleted()) { // request has been completed, most likely failed by a timeout // return the acquired resource back to the pool this._release(address, resource, pool) + .catch(error => { + if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions + this._log.debug(`${resource} could not be release back to the pool. Cause: ${error}`) + } + }) } else { // request is still pending and can be resolved with the newly acquired resource pendingRequest.resolve(resource) // resolve the pending request with the acquired resource @@ -371,14 +432,15 @@ class Pool { // failed to acquire a valid resource from the pool // return the pending request back to the pool if (!pendingRequest.isCompleted()) { - if (!this._acquireRequests[key]) { + if (this._acquireRequests[key] == null) { this._acquireRequests[key] = [] } this._acquireRequests[key].unshift(pendingRequest) } } - }) + }).catch(error => pendingRequest.reject(error)) } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete this._acquireRequests[key] } } @@ -390,8 +452,8 @@ class Pool { * @param {string} key the resource group identifier (server address for connections). * @param {Object.} activeResourceCounts the object holding active counts per key. */ -function resourceAcquired (key, activeResourceCounts) { - const currentCount = activeResourceCounts[key] || 0 +function resourceAcquired (key: string, activeResourceCounts: { [key: string]: number }): void { + const currentCount = activeResourceCounts[key] ?? 0 activeResourceCounts[key] = currentCount + 1 } @@ -400,19 +462,29 @@ function resourceAcquired (key, activeResourceCounts) { * @param {string} key the resource group identifier (server address for connections). * @param {Object.} activeResourceCounts the object holding active counts per key. */ -function resourceReleased (key, activeResourceCounts) { - const currentCount = activeResourceCounts[key] || 0 +function resourceReleased (key: string, activeResourceCounts: { [key: string]: number }): void { + const currentCount = activeResourceCounts[key] ?? 0 const nextCount = currentCount - 1 if (nextCount > 0) { activeResourceCounts[key] = nextCount } else { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete delete activeResourceCounts[key] } } -class PendingRequest { - constructor (key, context, config, resolve, reject, timeoutId, log) { +class PendingRequest { + private readonly _key: string + private readonly _context: unknown + private readonly _config: AcquisitionConfig + private readonly _resolve: (resource: R) => void + private readonly _reject: (error: Error) => void + private readonly _timeoutId: any + private readonly _log: Logger + private _completed: boolean + + constructor (key: string, context: unknown, config: AcquisitionConfig | undefined, resolve: (resource: R) => void, reject: (error: Error) => void, timeoutId: any, log: Logger) { this._key = key this._context = context this._resolve = resolve @@ -420,22 +492,22 @@ class PendingRequest { this._timeoutId = timeoutId this._log = log this._completed = false - this._config = config || {} + this._config = config ?? {} } - get context () { + get context (): unknown { return this._context } - get requireNew () { - return this._config.requireNew || false + get requireNew (): boolean { + return this._config.requireNew ?? false } - isCompleted () { + isCompleted (): boolean { return this._completed } - resolve (resource) { + resolve (resource: R): void { if (this._completed) { return } @@ -443,12 +515,13 @@ class PendingRequest { clearTimeout(this._timeoutId) if (this._log.isDebugEnabled()) { + // eslint-disable-next-line @typescript-eslint/restrict-template-expressions this._log.debug(`${resource} acquired from the pool ${this._key}`) } this._resolve(resource) } - reject (error) { + reject (error: Error): void { if (this._completed) { return } @@ -459,53 +532,59 @@ class PendingRequest { } } -class SingleAddressPool { +class SingleAddressPool { + private _active: boolean + private _elements: R[] + private _elementsInUse: Set + constructor () { this._active = true this._elements = [] this._elementsInUse = new Set() } - isActive () { + isActive (): boolean { return this._active } - close () { + close (): void { this._active = false this._elements = [] this._elementsInUse = new Set() } - filter (predicate) { + filter (predicate: (resource: R) => boolean): SingleAddressPool { this._elements = this._elements.filter(predicate) return this } - apply (resourceConsumer) { + apply (resourceConsumer: (resource: R) => void): void { this._elements.forEach(resourceConsumer) this._elementsInUse.forEach(resourceConsumer) } - get length () { + get length (): number { return this._elements.length } - pop () { + pop (): R | undefined { const element = this._elements.pop() - this._elementsInUse.add(element) + if (element != null) { + this._elementsInUse.add(element) + } return element } - push (element) { + push (element: R): number { this._elementsInUse.delete(element) return this._elements.push(element) } - pushInUse (element) { + pushInUse (element: R): void { this._elementsInUse.add(element) } - removeInUse (element) { + removeInUse (element: R): void { this._elementsInUse.delete(element) } } diff --git a/packages/neo4j-driver/test/driver.test.js b/packages/neo4j-driver/test/driver.test.js index 1be602cba..224086a56 100644 --- a/packages/neo4j-driver/test/driver.test.js +++ b/packages/neo4j-driver/test/driver.test.js @@ -18,15 +18,12 @@ import neo4j from '../src' import sharedNeo4j from './internal/shared-neo4j' import lolex from 'lolex' -import { - DEFAULT_ACQUISITION_TIMEOUT, - DEFAULT_MAX_SIZE -} from '../../bolt-connection/lib/pool/pool-config' import testUtils from './internal/test-utils' import { json, internal, bookmarkManager } from 'neo4j-driver-core' const { - bookmarks: { Bookmarks } + bookmarks: { Bookmarks }, + pool: { DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE } } = internal // As long as driver creation doesn't touch the network it's fine to run diff --git a/packages/neo4j-driver/test/internal/least-connected-load-balancing-strategy.test.js b/packages/neo4j-driver/test/internal/least-connected-load-balancing-strategy.test.js index 75362f288..882dd0faf 100644 --- a/packages/neo4j-driver/test/internal/least-connected-load-balancing-strategy.test.js +++ b/packages/neo4j-driver/test/internal/least-connected-load-balancing-strategy.test.js @@ -14,10 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import Pool from '../../../bolt-connection/lib/pool/pool' +import { internal } from 'neo4j-driver-core' import { loadBalancing } from 'neo4j-driver-bolt-connection' const { LeastConnectedLoadBalancingStrategy } = loadBalancing +const { + pool: { Pool } +} = internal + describe('#unit LeastConnectedLoadBalancingStrategy', () => { it('should return null when no readers', () => { const knownReaders = [] diff --git a/packages/neo4j-driver/test/internal/pool-config.test.js b/packages/neo4j-driver/test/internal/pool-config.test.js index 43218098e..7361b250e 100644 --- a/packages/neo4j-driver/test/internal/pool-config.test.js +++ b/packages/neo4j-driver/test/internal/pool-config.test.js @@ -15,10 +15,11 @@ * limitations under the License. */ -import PoolConfig, { - DEFAULT_ACQUISITION_TIMEOUT, - DEFAULT_MAX_SIZE -} from '../../../bolt-connection/lib/pool/pool-config' +import { internal } from 'neo4j-driver-core' + +const { + pool: { DEFAULT_ACQUISITION_TIMEOUT, DEFAULT_MAX_SIZE, PoolConfig } +} = internal describe('#unit PoolConfig', () => { let originalConsoleWarn