Skip to content

Commit 3a25656

Browse files
authored
259 As a developer, I want the connection pool as an instance (#263)
* 259 As a developer, I want the connection pool as an instance * fixed name
1 parent 910b8c5 commit 3a25656

File tree

5 files changed

+25
-28
lines changed

5 files changed

+25
-28
lines changed

example/package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/client.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export class Client {
6868
private publishers = new Map<string, PublisherMappedValue>()
6969
private compressions = new Map<CompressionType, Compression>()
7070
private connection: Connection
71+
private pool: ConnectionPool = new ConnectionPool()
7172

7273
private constructor(
7374
private readonly logger: Logger,
@@ -115,7 +116,7 @@ export class Client {
115116

116117
private async closeConnectionIfUnused(connection: Connection, params: ClosingParams) {
117118
if (connection.refCount <= 0) {
118-
ConnectionPool.removeCachedConnection(this.connection)
119+
this.pool.removeCachedConnection(this.connection)
119120
await this.connection.close({ ...params, manuallyClose: true })
120121
}
121122
}
@@ -162,7 +163,7 @@ export class Client {
162163
publisherRef: streamPublisherParams.publisherRef,
163164
})
164165
}
165-
const publisher = new StreamPublisher(streamPublisherParams, lastPublishingId, filter)
166+
const publisher = new StreamPublisher(this.pool, streamPublisherParams, lastPublishingId, filter)
166167
connection.registerForClosePublisher(publisher.extendedId, params.stream, async () => {
167168
await publisher.close(false)
168169
this.publishers.delete(publisher.extendedId)
@@ -203,6 +204,7 @@ export class Client {
203204
}
204205

205206
const consumer = new StreamConsumer(
207+
this.pool,
206208
handle,
207209
{
208210
connection,
@@ -631,12 +633,7 @@ export class Client {
631633
if (!chosenNode) {
632634
throw new Error(`Stream was not found on any node`)
633635
}
634-
const cachedConnection = ConnectionPool.getUsableCachedConnection(
635-
purpose,
636-
streamName,
637-
this.connection.vhost,
638-
chosenNode.host
639-
)
636+
const cachedConnection = this.pool.getCachedConnection(purpose, streamName, this.connection.vhost, chosenNode.host)
640637
if (cachedConnection) return cachedConnection
641638

642639
const newConnection = await this.getConnectionOnChosenNode(
@@ -647,7 +644,7 @@ export class Client {
647644
connectionClosedListener
648645
)
649646

650-
ConnectionPool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection)
647+
this.pool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection)
651648
return newConnection
652649
}
653650

src/connection_pool.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,55 +5,53 @@ type InstanceKey = string
55
export type ConnectionPurpose = "consumer" | "publisher"
66

77
export class ConnectionPool {
8-
private static consumerConnectionProxies = new Map<InstanceKey, Connection[]>()
9-
private static publisherConnectionProxies = new Map<InstanceKey, Connection[]>()
8+
private consumerConnectionProxies: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()
9+
private publisherConnectionProxies: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()
1010

11-
public static getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) {
12-
const map =
13-
purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
14-
const key = ConnectionPool.getCacheKey(streamName, vhost, host)
11+
public getCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) {
12+
const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies
13+
const key = this.getCacheKey(streamName, vhost, host)
1514
const proxies = map.get(key) || []
1615
const connection = proxies.at(-1)
1716
const refCount = connection?.refCount
1817
return refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined
1918
}
2019

21-
public static cacheConnection(
20+
public cacheConnection(
2221
purpose: ConnectionPurpose,
2322
streamName: string,
2423
vhost: string,
2524
host: string,
2625
client: Connection
2726
) {
28-
const map =
29-
purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
30-
const key = ConnectionPool.getCacheKey(streamName, vhost, host)
27+
const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies
28+
const key = this.getCacheKey(streamName, vhost, host)
3129
const currentlyCached = map.get(key) || []
3230
currentlyCached.push(client)
3331
map.set(key, currentlyCached)
3432
}
3533

36-
public static removeIfUnused(connection: Connection) {
34+
public removeIfUnused(connection: Connection) {
3735
if (connection.refCount <= 0) {
38-
ConnectionPool.removeCachedConnection(connection)
36+
this.removeCachedConnection(connection)
3937
return true
4038
}
4139
return false
4240
}
4341

44-
public static removeCachedConnection(connection: Connection) {
42+
public removeCachedConnection(connection: Connection) {
4543
const { leader, streamName, hostname: host, vhost } = connection
4644
if (streamName === undefined) return
47-
const m = leader ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
48-
const k = ConnectionPool.getCacheKey(streamName, vhost, host)
45+
const m = leader ? this.publisherConnectionProxies : this.consumerConnectionProxies
46+
const k = this.getCacheKey(streamName, vhost, host)
4947
const mappedClientList = m.get(k)
5048
if (mappedClientList) {
5149
const filtered = mappedClientList.filter((c) => c !== connection)
5250
m.set(k, filtered)
5351
}
5452
}
5553

56-
private static getCacheKey(streamName: string, vhost: string, host: string) {
54+
private getCacheKey(streamName: string, vhost: string, host: string) {
5755
return `${streamName}@${vhost}@${host}`
5856
}
5957
}

src/consumer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export class StreamConsumer implements Consumer {
6868
private singleActive: boolean = false
6969

7070
constructor(
71+
private pool: ConnectionPool,
7172
handle: ConsumerFunc,
7273
params: {
7374
connection: Connection
@@ -99,7 +100,7 @@ export class StreamConsumer implements Consumer {
99100
async close(manuallyClose: boolean): Promise<void> {
100101
this.closed = true
101102
this.connection.decrRefCount()
102-
if (ConnectionPool.removeIfUnused(this.connection)) {
103+
if (this.pool.removeIfUnused(this.connection)) {
103104
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
104105
}
105106
}

src/publisher.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ export class StreamPublisher implements Publisher {
173173
private _closed = false
174174

175175
constructor(
176+
private pool: ConnectionPool,
176177
params: {
177178
connection: Connection
178179
stream: string
@@ -281,7 +282,7 @@ export class StreamPublisher implements Publisher {
281282
if (!this.closed) {
282283
await this.flush()
283284
this.connection.decrRefCount()
284-
if (ConnectionPool.removeIfUnused(this.connection)) {
285+
if (this.pool.removeIfUnused(this.connection)) {
285286
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
286287
}
287288
}

0 commit comments

Comments
 (0)