Skip to content

Commit 8edc753

Browse files
committed
wip
1 parent 0e05b79 commit 8edc753

File tree

12 files changed

+1265
-1999
lines changed

12 files changed

+1265
-1999
lines changed

package-lock.json

Lines changed: 754 additions & 1757 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/client/lib/client/cache.spec.ts

Lines changed: 450 additions & 174 deletions
Large diffs are not rendered by default.

packages/client/lib/client/cache.ts

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ type CachingClientType = RedisClientType<any, any, any, any, any>;
88
type CmdFunc = () => Promise<ReplyUnion>;
99

1010
export interface ClientSideCacheConfig {
11-
ttl: number;
12-
maxEntries: number;
13-
lru: boolean;
11+
ttl?: number;
12+
maxEntries?: number;
13+
lru?: boolean;
1414
}
1515

1616
type CacheCreator = {
@@ -48,7 +48,7 @@ export class ClientSideCacheEntryValue extends ClientSideCacheEntryBase {
4848
readonly #value: any;
4949

5050
get value() {
51-
return structuredClone(this.#value);
51+
return this.#value;
5252
}
5353

5454
constructor(ttl: number, value: any) {
@@ -79,7 +79,6 @@ export abstract class ClientSideCacheProvider extends EventEmitter {
7979
abstract cacheMisses(): number;
8080
abstract onError(): void;
8181
abstract onClose(): void;
82-
abstract onDestroy(): void;
8382
}
8483

8584
export class BasicClientSideCache extends ClientSideCacheProvider {
@@ -98,7 +97,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
9897
this.#keyToCacheKeySetMap = new Map<string, Set<string>>();
9998
this.ttl = config?.ttl ?? 0;
10099
this.#maxEntries = config?.maxEntries ?? 0;
101-
this.#lru = config?.lru ?? false;
100+
this.#lru = config?.lru ?? true;
102101
}
103102

104103
/* logic of how caching works:
@@ -123,7 +122,6 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
123122
transformReply: TransformReply | undefined,
124123
typeMapping: TypeMapping | undefined
125124
) {
126-
console.log("handleCache: enter");
127125
let reply: ReplyUnion;
128126

129127
const cacheKey = parser.cacheKey;
@@ -133,10 +131,9 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
133131
if (cacheEntry) {
134132
// If instanceof is "too slow", can add a "type" and then use an "as" cast to call proper getters.
135133
if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1"
136-
console.log("returning value from cache");
137134
this.#cacheHit();
138135

139-
return cacheEntry.value;
136+
return structuredClone(cacheEntry.value);
140137
} else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b2
141138
// unsure if this should be considered a cache hit, a miss, or neither?
142139
reply = await cacheEntry.promise;
@@ -171,15 +168,14 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
171168
// 3b
172169
if (cacheEntry.validate()) { // revalidating promise entry (dont save value, if promise entry has been invalidated)
173170
// 3c
174-
console.log("saving value to cache");
175171
cacheEntry = this.createValueEntry(client, val);
176172
this.set(cacheKey, cacheEntry, parser.keys);
177173
this.emit("cached-key", cacheKey);
178174
} else {
179-
console.log("cache entry for key got invalidated between execution and saving, so not saving");
175+
// console.log("cache entry for key got invalidated between execution and saving, so not saving");
180176
}
181177

182-
return val;
178+
return structuredClone(val);
183179
}
184180

185181
override trackingOn() {
@@ -189,21 +185,24 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
189185
override invalidate(key: RedisArgument | null) {
190186
if (key === null) {
191187
this.clear(false);
188+
this.emit("invalidate", key);
189+
192190
return;
193191
}
194-
const set = this.#keyToCacheKeySetMap.get(key.toString());
195-
if (set) {
196-
for (const cacheKey of set) {
197-
console.log(`invalidate: got ${cacheKey} from key ${key.toString()} set`);
192+
193+
const keySet = this.#keyToCacheKeySetMap.get(key.toString());
194+
if (keySet) {
195+
for (const cacheKey of keySet) {
198196
const entry = this.#cacheKeyToEntryMap.get(cacheKey);
199197
if (entry) {
200198
entry.invalidate();
201199
}
202200
this.#cacheKeyToEntryMap.delete(cacheKey);
203-
this.emit("invalidate", cacheKey);
204201
}
205202
this.#keyToCacheKeySetMap.delete(key.toString());
206203
}
204+
205+
this.emit('invalidate', key);
207206
}
208207

209208
override clear(reset = true) {
@@ -305,9 +304,9 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
305304
this.clear();
306305
}
307306

308-
override onClose() { }
309-
310-
override onDestroy() { }
307+
override onClose() {
308+
this.clear();
309+
}
311310

312311
/**
313312
* @internal
@@ -366,7 +365,11 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache
366365
return super.has(cacheKey);
367366
}
368367

369-
onConnect(factory: () => CachingClientType) {};
368+
onPoolConnect(factory: () => CachingClientType) {};
369+
370+
onPoolClose() {
371+
this.clear();
372+
};
370373
}
371374

372375
// doesn't do anything special in pooling, clears cache on every client disconnect
@@ -382,6 +385,14 @@ export class BasicPooledClientSideCache extends PooledClientSideCacheProvider {
382385
override removeClient(client: CachingClientType): void {
383386
return;
384387
}
388+
389+
override onError() {
390+
this.clear(false);
391+
}
392+
393+
override onClose() {
394+
this.clear(false);
395+
}
385396
}
386397

387398
class PooledClientSideCacheEntryValue extends ClientSideCacheEntryValue {
@@ -443,7 +454,9 @@ export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache
443454
}
444455

445456
// don't clear cache on error here
446-
override onError(): void {}
457+
override onError() {}
458+
459+
override onClose() {}
447460
}
448461

449462
// Only clears cache on "management"/"redirect" client disconnect
@@ -482,7 +495,7 @@ export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider
482495

483496
override onError(): void {};
484497

485-
override async onConnect(factory: () => CachingClientType) {
498+
override async onPoolConnect(factory: () => CachingClientType) {
486499
const client = factory();
487500
this.#redirectClient = client;
488501

@@ -502,20 +515,10 @@ export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider
502515
}
503516
}
504517

505-
onDestroy() {
506-
this.clear();
518+
override onClose() {};
507519

508-
if (this.#redirectClient) {
509-
this.#id = undefined;
510-
const client = this.#redirectClient;
511-
this.#redirectClient = undefined;
512-
513-
client.destroy();
514-
}
515-
}
516-
517-
override async onClose() {
518-
this.clear();
520+
override onPoolClose() {
521+
super.onPoolClose();
519522

520523
if (this.#redirectClient) {
521524
this.#id = undefined;

packages/client/lib/client/commands-queue.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ export default class RedisCommandsQueue {
113113
if (!this.#onPush(push)) {
114114
switch (push[0].toString()) {
115115
case "invalidate": {
116-
console.log("invalidate push message");
117116
if (this.#invalidateCallback) {
118117
if (push[1] !== null) {
119118
for (const key of push[1]) {

packages/client/lib/client/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,7 @@ export default class RedisClient<
11101110
return new Promise<void>(resolve => {
11111111
clearTimeout(this._self.#pingTimer);
11121112
this._self.#socket.close();
1113+
this._self.#clientSideCache?.onClose();
11131114

11141115
if (this._self.#queue.isEmpty()) {
11151116
this._self.#socket.destroySocket();
@@ -1134,6 +1135,7 @@ export default class RedisClient<
11341135
clearTimeout(this._self.#pingTimer);
11351136
this._self.#queue.flushAll(new DisconnectsClientError());
11361137
this._self.#socket.destroy();
1138+
this._self.#clientSideCache?.onClose();
11371139
}
11381140

11391141
ref() {

packages/client/lib/client/pool.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ export class RedisClientPool<
313313
this._self.#isOpen = true;
314314

315315
try {
316-
this._self.#clientSideCache?.onConnect(this._self.#clientFactory);
316+
this._self.#clientSideCache?.onPoolConnect(this._self.#clientFactory);
317317
} catch (err) {
318318
this.destroy();
319319
throw err;
@@ -476,9 +476,9 @@ export class RedisClientPool<
476476
promises.push(client.close());
477477
}
478478

479-
promises.push(this._self.#clientSideCache?.onClose());
480-
481479
await Promise.all(promises);
480+
481+
this.#clientSideCache?.onPoolClose();
482482

483483
this._self.#idleClients.reset();
484484
this._self.#clientsInUse.reset();
@@ -499,7 +499,7 @@ export class RedisClientPool<
499499
client.destroy();
500500
}
501501

502-
this._self.#clientSideCache?.onDestroy();
502+
this._self.#clientSideCache?.onPoolClose();
503503

504504
this._self.#clientsInUse.reset();
505505

packages/client/lib/cluster/cluster-slots.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ export default class RedisClusterSlots<
328328
return this.#clientFactory(
329329
this.#clientOptionsDefaults({
330330
clientSideCache: this.clientSideCache,
331+
RESP: this.#options.RESP,
331332
socket: this.#getNodeAddress(node.address) ?? {
332333
host: node.host,
333334
port: node.port

packages/client/lib/cluster/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,10 +595,12 @@ export default class RedisCluster<
595595
}
596596

597597
close() {
598+
this.#slots.clientSideCache?.onPoolClose();
598599
return this._self.#slots.close();
599600
}
600601

601602
destroy() {
603+
this.#slots.clientSideCache?.onPoolClose();
602604
return this._self.#slots.destroy();
603605
}
604606

packages/client/lib/commands/GEOSEARCH.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,7 @@ export function parseGeoSearchArguments(
2929
from: GeoSearchFrom,
3030
by: GeoSearchBy,
3131
options?: GeoSearchOptions,
32-
store?: RedisArgument
3332
) {
34-
if (store !== undefined) {
35-
parser.pushKey(store);
36-
}
37-
3833
parser.pushKey(key);
3934

4035
if (typeof from === 'string' || from instanceof Buffer) {

packages/client/lib/commands/GEOSEARCHSTORE.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@ export default {
1717
options?: GeoSearchStoreOptions
1818
) {
1919
parser.push('GEOSEARCHSTORE');
20-
parseGeoSearchArguments(parser, source, from, by, options, destination);
20+
21+
if (destination !== undefined) {
22+
parser.pushKey(destination);
23+
}
24+
25+
parseGeoSearchArguments(parser, source, from, by, options);
2126

2227
if (options?.STOREDIST) {
2328
parser.push('STOREDIST');

packages/client/lib/sentinel/index.spec.ts

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import RedisBloomModules from '@redis/bloom';
1515
import { BasicPooledClientSideCache } from '../client/cache';
1616
import { RedisTcpSocketOptions } from '../client/socket';
1717
import { SQUARE_SCRIPT } from '../client/index.spec';
18+
import { once } from 'node:events';
1819

1920
const execAsync = promisify(exec);
2021

@@ -78,7 +79,7 @@ async function steadyState(frame: SentinelFramework) {
7879
}
7980

8081
["redis-sentinel-test-password", undefined].forEach(function (password) {
81-
describe.skip(`Sentinel - password = ${password}`, () => {
82+
describe(`Sentinel - password = ${password}`, () => {
8283
const config: RedisSentinelConfig = { sentinelName: "test", numberOfNodes: 3, password: password };
8384
const frame = new SentinelFramework(config);
8485
let tracer = new Array<string>();
@@ -1137,22 +1138,10 @@ async function steadyState(frame: SentinelFramework) {
11371138
await nodeAddedPromise;
11381139
})
11391140

1140-
it('with client side caching', async function() {
1141-
let invalidateResolve;
1142-
const invalidatePromise = new Promise((res, rej) => {
1143-
invalidateResolve = res;
1144-
})
1145-
1141+
it('with client side caching', async function() {
11461142
this.timeout(30000);
11471143
const csc = new BasicPooledClientSideCache();
11481144

1149-
csc.on("invalidate", (cacheKey: string) => {
1150-
if (cacheKey === "GET_x") {
1151-
console.log(`invalidating ${cacheKey}`);
1152-
invalidateResolve(true);
1153-
}
1154-
})
1155-
11561145
sentinel = frame.getSentinelClient({nodeClientOptions: {RESP: 3}, clientSideCache: csc, masterPoolSize: 5});
11571146
await sentinel.connect();
11581147

@@ -1165,6 +1154,7 @@ async function steadyState(frame: SentinelFramework) {
11651154
assert.equal(1, csc.cacheMisses());
11661155
assert.equal(3, csc.cacheHits());
11671156

1157+
const invalidatePromise = once(csc, 'invalidate');
11681158
await sentinel.set('x', 2);
11691159
await invalidatePromise;
11701160
await sentinel.get('x');

packages/client/lib/sentinel/index.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,10 +273,6 @@ export default class RedisSentinel<
273273

274274
this.#options = options;
275275

276-
if (options.replicaPoolSize != 0 && options.clientSideCache) {
277-
throw new Error("cannot use replica reads and client side cache together");
278-
}
279-
280276
if (options.commandOptions) {
281277
this.#commandOptions = options.commandOptions;
282278
}
@@ -848,9 +844,7 @@ class RedisSentinelInternal<
848844

849845
this.#isReady = false;
850846

851-
if (this.#clientSideCache) {
852-
this.#clientSideCache.onClose();
853-
}
847+
this.#clientSideCache?.onPoolClose();
854848

855849
if (this.#scanTimer) {
856850
clearInterval(this.#scanTimer);
@@ -900,6 +894,8 @@ class RedisSentinelInternal<
900894

901895
this.#isReady = false;
902896

897+
this.#clientSideCache?.onPoolClose();
898+
903899
if (this.#scanTimer) {
904900
clearInterval(this.#scanTimer);
905901
this.#scanTimer = undefined;

0 commit comments

Comments
 (0)