From a94034c2caf7876d1003a3fe7411df6f21d869b5 Mon Sep 17 00:00:00 2001 From: Andrea Pietroni Date: Thu, 12 Jun 2025 17:27:54 +0200 Subject: [PATCH 1/2] 259 As a developer, I want the connection pool as an instance --- example/package-lock.json | 2 +- src/client.ts | 10 ++++++---- src/connection_pool.ts | 30 ++++++++++++++---------------- src/consumer.ts | 3 ++- src/publisher.ts | 3 ++- 5 files changed, 25 insertions(+), 23 deletions(-) diff --git a/example/package-lock.json b/example/package-lock.json index 2d55046..b7777d6 100644 --- a/example/package-lock.json +++ b/example/package-lock.json @@ -23,7 +23,7 @@ "extraneous": true }, "..": { - "version": "0.6.0", + "version": "0.6.1", "license": "ISC", "dependencies": { "semver": "^7.5.4" diff --git a/src/client.ts b/src/client.ts index 84e5252..751d9bf 100644 --- a/src/client.ts +++ b/src/client.ts @@ -68,6 +68,7 @@ export class Client { private publishers = new Map() private compressions = new Map() private connection: Connection + private pool: ConnectionPool = new ConnectionPool() private constructor( private readonly logger: Logger, @@ -115,7 +116,7 @@ export class Client { private async closeConnectionIfUnused(connection: Connection, params: ClosingParams) { if (connection.refCount <= 0) { - ConnectionPool.removeCachedConnection(this.connection) + this.pool.removeCachedConnection(this.connection) await this.connection.close({ ...params, manuallyClose: true }) } } @@ -162,7 +163,7 @@ export class Client { publisherRef: streamPublisherParams.publisherRef, }) } - const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter) + const publisher = new StreamPublisher(this.pool, streamPublisherParams, lastPublishingId, filter) connection.registerForClosePublisher(publisher.extendedId, params.stream, async () => { await publisher.close(false) this.publishers.delete(publisher.extendedId) @@ -203,6 +204,7 @@ export class Client { } const consumer = new StreamConsumer( + this.pool, handle, { connection, @@ -631,7 +633,7 @@ export class Client { if (!chosenNode) { throw new Error(`Stream was not found on any node`) } - const cachedConnection = ConnectionPool.getUsableCachedConnection( + const cachedConnection = this.pool.getUsableCachedConnection( purpose, streamName, this.connection.vhost, @@ -647,7 +649,7 @@ export class Client { connectionClosedListener ) - ConnectionPool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection) + this.pool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection) return newConnection } diff --git a/src/connection_pool.ts b/src/connection_pool.ts index aa0125d..6656cc8 100644 --- a/src/connection_pool.ts +++ b/src/connection_pool.ts @@ -5,47 +5,45 @@ type InstanceKey = string export type ConnectionPurpose = "consumer" | "publisher" export class ConnectionPool { - private static consumerConnectionProxies = new Map() - private static publisherConnectionProxies = new Map() + private consumerConnectionProxies: Map = new Map() + private publisherConnectionProxies: Map = new Map() - public static getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) { - const map = - purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies - const key = ConnectionPool.getCacheKey(streamName, vhost, host) + public getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) { + const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies + const key = this.getCacheKey(streamName, vhost, host) const proxies = map.get(key) || [] const connection = proxies.at(-1) const refCount = connection?.refCount return refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined } - public static cacheConnection( + public cacheConnection( purpose: ConnectionPurpose, streamName: string, vhost: string, host: string, client: Connection ) { - const map = - purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies - const key = ConnectionPool.getCacheKey(streamName, vhost, host) + const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies + const key = this.getCacheKey(streamName, vhost, host) const currentlyCached = map.get(key) || [] currentlyCached.push(client) map.set(key, currentlyCached) } - public static removeIfUnused(connection: Connection) { + public removeIfUnused(connection: Connection) { if (connection.refCount <= 0) { - ConnectionPool.removeCachedConnection(connection) + this.removeCachedConnection(connection) return true } return false } - public static removeCachedConnection(connection: Connection) { + public removeCachedConnection(connection: Connection) { const { leader, streamName, hostname: host, vhost } = connection if (streamName === undefined) return - const m = leader ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies - const k = ConnectionPool.getCacheKey(streamName, vhost, host) + const m = leader ? this.publisherConnectionProxies : this.consumerConnectionProxies + const k = this.getCacheKey(streamName, vhost, host) const mappedClientList = m.get(k) if (mappedClientList) { const filtered = mappedClientList.filter((c) => c !== connection) @@ -53,7 +51,7 @@ export class ConnectionPool { } } - private static getCacheKey(streamName: string, vhost: string, host: string) { + private getCacheKey(streamName: string, vhost: string, host: string) { return `${streamName}@${vhost}@${host}` } } diff --git a/src/consumer.ts b/src/consumer.ts index 6bbfd16..5058a96 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -68,6 +68,7 @@ export class StreamConsumer implements Consumer { private singleActive: boolean = false constructor( + private pool: ConnectionPool, handle: ConsumerFunc, params: { connection: Connection @@ -99,7 +100,7 @@ export class StreamConsumer implements Consumer { async close(manuallyClose: boolean): Promise { this.closed = true this.connection.decrRefCount() - if (ConnectionPool.removeIfUnused(this.connection)) { + if (this.pool.removeIfUnused(this.connection)) { await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose }) } } diff --git a/src/publisher.ts b/src/publisher.ts index 66665ff..9987b40 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -173,6 +173,7 @@ export class StreamPublisher implements Publisher { private _closed = false constructor( + private pool: ConnectionPool, params: { connection: Connection stream: string @@ -281,7 +282,7 @@ export class StreamPublisher implements Publisher { if (!this.closed) { await this.flush() this.connection.decrRefCount() - if (ConnectionPool.removeIfUnused(this.connection)) { + if (this.pool.removeIfUnused(this.connection)) { await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose }) } } From f4a0a458e0039fa98b415abedb2bace3003918f4 Mon Sep 17 00:00:00 2001 From: Andrea Pietroni Date: Thu, 12 Jun 2025 17:37:58 +0200 Subject: [PATCH 2/2] fixed name --- src/client.ts | 7 +------ src/connection_pool.ts | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/client.ts b/src/client.ts index 751d9bf..c6bb395 100644 --- a/src/client.ts +++ b/src/client.ts @@ -633,12 +633,7 @@ export class Client { if (!chosenNode) { throw new Error(`Stream was not found on any node`) } - const cachedConnection = this.pool.getUsableCachedConnection( - purpose, - streamName, - this.connection.vhost, - chosenNode.host - ) + const cachedConnection = this.pool.getCachedConnection(purpose, streamName, this.connection.vhost, chosenNode.host) if (cachedConnection) return cachedConnection const newConnection = await this.getConnectionOnChosenNode( diff --git a/src/connection_pool.ts b/src/connection_pool.ts index 6656cc8..8910ec1 100644 --- a/src/connection_pool.ts +++ b/src/connection_pool.ts @@ -8,7 +8,7 @@ export class ConnectionPool { private consumerConnectionProxies: Map = new Map() private publisherConnectionProxies: Map = new Map() - public getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) { + public getCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) { const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies const key = this.getCacheKey(streamName, vhost, host) const proxies = map.get(key) || []