Skip to content

Commit 1bc5d9e

Browse files
committed
for "redirect" client, don't treat it as part of pool
this enables a lot of cleanup
1 parent 5418cc4 commit 1bc5d9e

File tree

3 files changed

+67
-64
lines changed

3 files changed

+67
-64
lines changed

packages/client/lib/client/cache.ts

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ export abstract class ClientSideCacheProvider {
8787
abstract clear(): void;
8888
abstract cacheHits(): number;
8989
abstract cacheMisses(): number;
90+
abstract clearOnReconnect(): boolean;
9091
}
9192

9293
export class BasicClientSideCache extends ClientSideCacheProvider {
@@ -108,6 +109,10 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
108109
this.#lru = lru;
109110
}
110111

112+
override clearOnReconnect(): boolean {
113+
return true;
114+
}
115+
111116
/* logic of how caching works:
112117
113118
1. every command that's cachable provide a getCacheInfo(args) function
@@ -399,17 +404,17 @@ export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache
399404
return new PooledClientSideCacheEntryPromise(this.ttl, creator, sendCommandPromise);
400405
}
401406

402-
// doesn't provide any mechanism for clearing cache
403-
override clear(): void {
404-
return;
407+
override clearOnReconnect(): boolean {
408+
return false;
405409
}
406410
}
407411

408412
// Only clears cache on "management"/"redirect" client disconnect
409413
export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider {
410414
#id?: number;
411-
#clients: Set<RedisClient<any, any, any, any, any>> = new Set();
415+
#clients: Set<CachingClient> = new Set();
412416
#disabled = true;
417+
#redirectClient?: CachingClient;
413418

414419
constructor(ttl: number, maxEntries: number, lru: boolean) {
415420
super(ttl, maxEntries, lru);
@@ -450,7 +455,7 @@ export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider
450455
override updateRedirect(id: number) {
451456
this.#id = id;
452457
for (const client of this.#clients) {
453-
client.sendCommand(this.trackingOn())
458+
client.sendCommand(this.trackingOn()).catch(() => {});
454459
}
455460
}
456461

@@ -462,13 +467,27 @@ export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider
462467
this.#clients.delete(client);
463468
}
464469

465-
// only clear via redirect client, as this will be on the regular client emits
466-
override clear() {
467-
return;
470+
override clearOnReconnect(): boolean {
471+
return false;
472+
}
473+
474+
setRedirectClient(client: CachingClient) {
475+
this.#redirectClient = client;
468476
}
469477

470-
// this will be attached to the redirect client emits.
471-
clearPool() {
472-
super.clear();
478+
destroy() {
479+
this.clear();
480+
481+
if (this.#redirectClient) {
482+
this.#redirectClient.destroy();
483+
}
484+
}
485+
486+
async close() {
487+
this.clear();
488+
489+
if (this.#redirectClient) {
490+
return this.#redirectClient.close();
491+
}
473492
}
474493
}

packages/client/lib/client/index.ts

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,9 @@ export default class RedisClient<
316316
#monitorCallback?: MonitorCallback<TYPE_MAPPING>;
317317
private _self = this;
318318
private _commandOptions?: CommandOptions<TYPE_MAPPING>;
319-
#annotations = new Map<string, string>();
320-
319+
321320
#clientSideCache?: ClientSideCacheProvider;
321+
#clearOnReconnect = false;
322322

323323
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
324324
return this._self.#options;
@@ -347,11 +347,14 @@ export default class RedisClient<
347347
this.#queue = this.#initiateQueue();
348348
this.#socket = this.#initiateSocket();
349349

350-
if (options?.clientSideCache instanceof ClientSideCacheProvider) {
351-
this.#clientSideCache = options.clientSideCache;
352-
} else if (options?.clientSideCache !== undefined) {
353-
const cscConfig = options.clientSideCache;
354-
this.#clientSideCache = new BasicClientSideCache(cscConfig.ttl, cscConfig.maxEntries, cscConfig.lru);
350+
if (options?.clientSideCache) {
351+
if (options.clientSideCache instanceof ClientSideCacheProvider) {
352+
this.#clientSideCache = options.clientSideCache;
353+
} else {
354+
const cscConfig = options.clientSideCache;
355+
this.#clientSideCache = new BasicClientSideCache(cscConfig.ttl, cscConfig.maxEntries, cscConfig.lru);
356+
}
357+
this.#clearOnReconnect = this.#clientSideCache.clearOnReconnect();
355358
}
356359
}
357360

@@ -483,7 +486,9 @@ export default class RedisClient<
483486
})
484487
.on('error', err => {
485488
this.emit('error', err);
486-
this.#clientSideCache?.clear(); // clear at failure time
489+
if (this.#clearOnReconnect) {
490+
this.#clientSideCache!.clear();
491+
}
487492
if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) {
488493
this.#queue.flushWaitingForReply(err);
489494
} else {
@@ -641,26 +646,6 @@ export default class RedisClient<
641646
}
642647
}
643648

644-
setAnnotation(key: string, val: string) {
645-
this._self.#annotations.set(key, val);
646-
}
647-
648-
getAnnotation(key: string) {
649-
return this._self.#annotations.get(key);
650-
}
651-
652-
clearAnnotations() {
653-
return this._self.#annotations.clear();
654-
}
655-
656-
hasAnnotation(key: string) {
657-
return this._self.#annotations.has(key);
658-
}
659-
660-
deleteAnnotation(key: string) {
661-
return this._self.#annotations.delete(key);
662-
}
663-
664649
async SELECT(db: number): Promise<void> {
665650
await this.sendCommand(['SELECT', db.toString()]);
666651
this._self.#selectedDB = db;

packages/client/lib/client/pool.ts

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -311,22 +311,29 @@ export class RedisClientPool<
311311
if (this._self.#isOpen) return; // TODO: throw error?
312312
this._self.#isOpen = true;
313313

314-
let count = 0;
315314
if (this._self.#clientSideCache instanceof PooledRedirectClientSideCache) {
316-
if (this._self.#options.minimum < 1) {
317-
this._self.#options.minimum = 1;
318-
}
319-
count = 1;
315+
const client = this._self.#clientFactory();
316+
const cache = this._self.#clientSideCache;
317+
client.on("error", () => {
318+
cache.disable();
319+
cache.clear();
320+
}).on("ready", async () => {
321+
const clientId = await client.clientId() as number;
322+
cache.updateRedirect(clientId);
323+
cache.enable();
324+
})
325+
320326
try {
321-
await this._self.#create(true)
327+
await client.connect();
328+
cache.setRedirectClient(client);
322329
} catch (err) {
323330
this.destroy();
324331
throw err;
325332
}
326333
}
327334

328335
const promises = [];
329-
while (promises.length + count < this._self.#options.minimum) {
336+
while (promises.length < this._self.#options.minimum) {
330337
promises.push(this._self.#create());
331338
}
332339

@@ -348,18 +355,7 @@ export class RedisClientPool<
348355

349356
try {
350357
const client = node.value;
351-
const cache = this._self.#clientSideCache;
352-
if (redirect && cache instanceof PooledRedirectClientSideCache) {
353-
client.setAnnotation("pool-master", "true");
354-
client.on("error", () => {
355-
cache.disable();
356-
}).on("ready", async () => {
357-
const clientId = await client.clientId() as number;
358-
cache.updateRedirect(clientId);
359-
cache.clearPool();
360-
cache.enable();
361-
})
362-
} else if (this._self.#clientSideCache) {
358+
if (this._self.#clientSideCache) {
363359
this._self.#clientSideCache.addClient(node.value);
364360
}
365361

@@ -453,12 +449,6 @@ export class RedisClientPool<
453449
for (let i = 0; i < toDestroy; i++) {
454450
// TODO: shift vs pop
455451
const client = this.#idleClients.shift()!
456-
if (client.hasAnnotation("pool-master")) {
457-
console.log("wanted to remove \"master\"/\"redirect\" client");
458-
this._self.#idleClients.push(client);
459-
continue;
460-
}
461-
462452
this.#clientSideCache?.removeClient(client);
463453
client.destroy();
464454
}
@@ -505,6 +495,10 @@ export class RedisClientPool<
505495
for (const client of this._self.#clientsInUse) {
506496
promises.push(client.close());
507497
}
498+
499+
if (this.#clientSideCache instanceof PooledRedirectClientSideCache) {
500+
promises.push(this.#clientSideCache.close());
501+
}
508502

509503
await Promise.all(promises);
510504

@@ -526,6 +520,11 @@ export class RedisClientPool<
526520
for (const client of this._self.#clientsInUse) {
527521
client.destroy();
528522
}
523+
524+
if (this.#clientSideCache instanceof PooledRedirectClientSideCache) {
525+
this.#clientSideCache.destroy();
526+
}
527+
529528
this._self.#clientsInUse.reset();
530529

531530
this._self.#isOpen = false;

0 commit comments

Comments
 (0)