From b88e6d9ad3353350df6d76ae99e81aa896534525 Mon Sep 17 00:00:00 2001 From: sync-common-bot Date: Thu, 8 Feb 2024 05:06:10 +1300 Subject: [PATCH 1/3] fix #2141 - cluster redirect if client closed --- packages/client/lib/cluster/index.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 49ac293d6cf..b434b770472 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -7,7 +7,7 @@ import { EventEmitter } from 'events'; import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; import { PubSubListener } from '../client/pub-sub'; -import { ErrorReply } from '../errors'; +import { ErrorReply, ClientClosedError } from '../errors'; export type RedisClusterClientOptions = Omit< RedisClientOptions, @@ -249,6 +249,13 @@ export default class RedisCluster< try { return await executor(client); } catch (err) { + if (err instanceof ClientClosedError) { + // client was closed, try again with a random client + client = await this.#slots.getClient(undefined, isReadonly); + i-- // don't count this attempt as a redirection + continue; + } + if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) { throw err; } From 0577661154539d9dc109d0bbe66b6277b68dfc46 Mon Sep 17 00:00:00 2001 From: sync-common-bot Date: Thu, 8 Feb 2024 20:33:23 +1300 Subject: [PATCH 2/3] Add auto discover --- packages/client/lib/cluster/cluster-slots.ts | 110 ++++++++++++------- packages/client/lib/cluster/index.ts | 30 +++-- 2 files changed, 91 insertions(+), 49 deletions(-) diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index b1cc49b4c82..3911e8f2e97 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -90,6 +90,18 @@ export type OnShardedChannelMovedError = ( listeners?: ChannelListeners ) => void; +type RedisClusterSlotsData< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts +> = { + slots: Array>; + shards: Array>; + masters: Array>; + replicas: Array>; + randomNodeIterator?: IterableIterator>; +} + export default class RedisClusterSlots< M extends RedisModules, F extends RedisFunctions, @@ -100,13 +112,20 @@ export default class RedisClusterSlots< readonly #options: RedisClusterOptions; readonly #Client: InstantiableRedisClient; readonly #emit: EventEmitter['emit']; - slots = new Array>(RedisClusterSlots.#SLOTS); - shards = new Array>(); - masters = new Array>(); - replicas = new Array>(); + + data: RedisClusterSlotsData = { + slots: new Array(RedisClusterSlots.#SLOTS), + shards: [], + masters: [], + replicas: [], + randomNodeIterator: undefined + } + readonly nodeByAddress = new Map | ShardNode>(); pubSubNode?: PubSubNode; + lastDiscovered = Date.now(); + #isOpen = false; get isOpen() { @@ -150,15 +169,24 @@ export default class RedisClusterSlots< } #resetSlots() { - this.slots = new Array(RedisClusterSlots.#SLOTS); - this.shards = []; - this.masters = []; - this.replicas = []; - this.#randomNodeIterator = undefined; + this.data = { + slots: new Array(RedisClusterSlots.#SLOTS), + shards: [], + masters: [], + replicas: [], + randomNodeIterator: undefined + } } async #discover(rootNode?: RedisClusterClientOptions) { - this.#resetSlots(); + this.lastDiscovered = Date.now(); + const newData: RedisClusterSlotsData = { + slots: new Array(RedisClusterSlots.#SLOTS), + shards: [], + masters: [], + replicas: [], + randomNodeIterator: undefined, + } const addressesInUse = new Set(); try { @@ -167,19 +195,19 @@ export default class RedisClusterSlots< eagerConnect = this.#options.minimizeConnections !== true; for (const { from, to, master, replicas } of shards) { const shard: Shard = { - master: this.#initiateSlotNode(master, false, eagerConnect, addressesInUse, promises) + master: this.#initiateSlotNode(newData, master, false, eagerConnect, addressesInUse, promises) }; if (this.#options.useReplicas) { shard.replicas = replicas.map(replica => - this.#initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises) + this.#initiateSlotNode(newData,replica, true, eagerConnect, addressesInUse, promises) ); } - this.shards.push(shard); + newData.shards.push(shard); for (let i = from; i <= to; i++) { - this.slots[i] = shard; + newData.slots[i] = shard; } } @@ -197,7 +225,7 @@ export default class RedisClusterSlots< if (channelsListeners.size || patternsListeners.size) { promises.push( - this.#initiatePubSubClient({ + this.#initiatePubSubClient(newData, { [PubSubType.CHANNELS]: channelsListeners, [PubSubType.PATTERNS]: patternsListeners }) @@ -227,6 +255,7 @@ export default class RedisClusterSlots< await Promise.all(promises); + this.data = newData; return true; } catch (err) { this.#emit('error', err); @@ -296,6 +325,7 @@ export default class RedisClusterSlots< } #initiateSlotNode( + data: RedisClusterSlotsData, { id, ip, port }: ClusterSlotsNode, readonly: boolean, eagerConnent: boolean, @@ -323,7 +353,7 @@ export default class RedisClusterSlots< this.nodeByAddress.set(address, node); } - (readonly ? this.replicas : this.masters).push(node); + (readonly ? data.replicas : data.masters).push(node); return node; } @@ -392,7 +422,7 @@ export default class RedisClusterSlots< this.#isOpen = false; const promises = []; - for (const { master, replicas } of this.shards) { + for (const { master, replicas } of this.data.shards) { if (master.client) { promises.push( this.#execOnNodeClient(master.client, fn) @@ -446,45 +476,43 @@ export default class RedisClusterSlots< const slotNumber = calculateSlot(firstKey); if (!isReadonly) { - return this.nodeClient(this.slots[slotNumber].master); + return this.nodeClient(this.data.slots[slotNumber].master); } return this.nodeClient(this.getSlotRandomNode(slotNumber)); } *#iterateAllNodes() { - let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length)); - if (i < this.masters.length) { + let i = Math.floor(Math.random() * (this.data.masters.length + this.data.replicas.length)); + if (i < this.data.masters.length) { do { - yield this.masters[i]; - } while (++i < this.masters.length); + yield this.data.masters[i]; + } while (++i < this.data.masters.length); - for (const replica of this.replicas) { + for (const replica of this.data.replicas) { yield replica; } } else { - i -= this.masters.length; + i -= this.data.masters.length; do { - yield this.replicas[i]; - } while (++i < this.replicas.length); + yield this.data.replicas[i]; + } while (++i < this.data.replicas.length); } while (true) { - for (const master of this.masters) { + for (const master of this.data.masters) { yield master; } - for (const replica of this.replicas) { + for (const replica of this.data.replicas) { yield replica; } } } - #randomNodeIterator?: IterableIterator>; - getRandomNode() { - this.#randomNodeIterator ??= this.#iterateAllNodes(); - return this.#randomNodeIterator.next().value as ShardNode; + this.data.randomNodeIterator ??= this.#iterateAllNodes(); + return this.data.randomNodeIterator.next().value as ShardNode; } *#slotNodesIterator(slot: ShardWithReplicas) { @@ -505,7 +533,7 @@ export default class RedisClusterSlots< } getSlotRandomNode(slotNumber: number) { - const slot = this.slots[slotNumber]; + const slot = this.data.slots[slotNumber]; if (!slot.replicas?.length) { return slot.master; } @@ -524,14 +552,14 @@ export default class RedisClusterSlots< getPubSubClient() { return this.pubSubNode ? this.pubSubNode.client : - this.#initiatePubSubClient(); + this.#initiatePubSubClient(this.data); } - async #initiatePubSubClient(toResubscribe?: PubSubToResubscribe) { - const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)), - node = index < this.masters.length ? - this.masters[index] : - this.replicas[index - this.masters.length]; + async #initiatePubSubClient(data: RedisClusterSlotsData, toResubscribe?: PubSubToResubscribe) { + const index = Math.floor(Math.random() * (data.masters.length + data.replicas.length)), + node = index < data.masters.length ? + data.masters[index] : + data.replicas[index - data.masters.length]; this.pubSubNode = { address: node.address, @@ -569,7 +597,7 @@ export default class RedisClusterSlots< } getShardedPubSubClient(channel: string) { - const { master } = this.slots[calculateSlot(channel)]; + const { master } = this.data.slots[calculateSlot(channel)]; return master.pubSubClient ?? this.#initiateShardedPubSubClient(master); } @@ -607,7 +635,7 @@ export default class RedisClusterSlots< channel: string, unsubscribe: (client: RedisClientType) => Promise ): Promise { - const { master } = this.slots[calculateSlot(channel)]; + const { master } = this.data.slots[calculateSlot(channel)]; if (!master.pubSubClient) return Promise.resolve(); const client = await master.pubSubClient; diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index b434b770472..584cf76db3a 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -7,7 +7,7 @@ import { EventEmitter } from 'events'; import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; import { PubSubListener } from '../client/pub-sub'; -import { ErrorReply, ClientClosedError } from '../errors'; +import { ClientClosedError, ErrorReply, ConnectionTimeoutError } from '../errors'; export type RedisClusterClientOptions = Omit< RedisClientOptions, @@ -100,19 +100,19 @@ export default class RedisCluster< readonly #slots: RedisClusterSlots; get slots() { - return this.#slots.slots; + return this.#slots.data.slots; } get shards() { - return this.#slots.shards; + return this.#slots.data.shards; } get masters() { - return this.#slots.masters; + return this.#slots.data.masters; } get replicas() { - return this.#slots.replicas; + return this.#slots.data.replicas; } get nodeByAddress() { @@ -243,6 +243,13 @@ export default class RedisCluster< isReadonly: boolean | undefined, executor: (client: RedisClientType) => Promise ): Promise { + if (this.#slots.lastDiscovered + 10000 < Date.now()) { + const discover = async () => { + let client = await this.#slots.nodeClient(this.#slots.getRandomNode()); + this.#slots.rediscover(client).catch(() => { /* ignore */ }) + } + discover(); // don't wait for it + } const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; let client = await this.#slots.getClient(firstKey, isReadonly); for (let i = 0;; i++) { @@ -250,9 +257,16 @@ export default class RedisCluster< return await executor(client); } catch (err) { if (err instanceof ClientClosedError) { - // client was closed, try again with a random client - client = await this.#slots.getClient(undefined, isReadonly); - i-- // don't count this attempt as a redirection + i-- // don't count this attempt + + // attempt to reconnect closed client + client.connect().catch(() => { /* ignore */ }); + + // try again with a different client + let oldClient = client + while (client === oldClient) { + client = await this.#slots.getClient(undefined, isReadonly) + } continue; } From 61bf367a623b3faa210d385307d253925598e5b3 Mon Sep 17 00:00:00 2001 From: sync-common-bot Date: Thu, 8 Feb 2024 20:58:57 +1300 Subject: [PATCH 3/3] rename to state --- packages/client/lib/cluster/cluster-slots.ts | 70 ++++++++++---------- packages/client/lib/cluster/index.ts | 8 +-- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index 3911e8f2e97..3ab0ee2ba82 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -90,7 +90,7 @@ export type OnShardedChannelMovedError = ( listeners?: ChannelListeners ) => void; -type RedisClusterSlotsData< +type RedisClusterSlotsState< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts @@ -113,7 +113,7 @@ export default class RedisClusterSlots< readonly #Client: InstantiableRedisClient; readonly #emit: EventEmitter['emit']; - data: RedisClusterSlotsData = { + state: RedisClusterSlotsState = { slots: new Array(RedisClusterSlots.#SLOTS), shards: [], masters: [], @@ -169,7 +169,7 @@ export default class RedisClusterSlots< } #resetSlots() { - this.data = { + this.state = { slots: new Array(RedisClusterSlots.#SLOTS), shards: [], masters: [], @@ -180,7 +180,7 @@ export default class RedisClusterSlots< async #discover(rootNode?: RedisClusterClientOptions) { this.lastDiscovered = Date.now(); - const newData: RedisClusterSlotsData = { + const newState: RedisClusterSlotsState = { slots: new Array(RedisClusterSlots.#SLOTS), shards: [], masters: [], @@ -195,19 +195,19 @@ export default class RedisClusterSlots< eagerConnect = this.#options.minimizeConnections !== true; for (const { from, to, master, replicas } of shards) { const shard: Shard = { - master: this.#initiateSlotNode(newData, master, false, eagerConnect, addressesInUse, promises) + master: this.#initiateSlotNode(newState, master, false, eagerConnect, addressesInUse, promises) }; if (this.#options.useReplicas) { shard.replicas = replicas.map(replica => - this.#initiateSlotNode(newData,replica, true, eagerConnect, addressesInUse, promises) + this.#initiateSlotNode(newState,replica, true, eagerConnect, addressesInUse, promises) ); } - newData.shards.push(shard); + newState.shards.push(shard); for (let i = from; i <= to; i++) { - newData.slots[i] = shard; + newState.slots[i] = shard; } } @@ -225,7 +225,7 @@ export default class RedisClusterSlots< if (channelsListeners.size || patternsListeners.size) { promises.push( - this.#initiatePubSubClient(newData, { + this.#initiatePubSubClient(newState, { [PubSubType.CHANNELS]: channelsListeners, [PubSubType.PATTERNS]: patternsListeners }) @@ -255,7 +255,7 @@ export default class RedisClusterSlots< await Promise.all(promises); - this.data = newData; + this.state = newState; return true; } catch (err) { this.#emit('error', err); @@ -325,7 +325,7 @@ export default class RedisClusterSlots< } #initiateSlotNode( - data: RedisClusterSlotsData, + state: RedisClusterSlotsState, { id, ip, port }: ClusterSlotsNode, readonly: boolean, eagerConnent: boolean, @@ -353,7 +353,7 @@ export default class RedisClusterSlots< this.nodeByAddress.set(address, node); } - (readonly ? data.replicas : data.masters).push(node); + (readonly ? state.replicas : state.masters).push(node); return node; } @@ -422,7 +422,7 @@ export default class RedisClusterSlots< this.#isOpen = false; const promises = []; - for (const { master, replicas } of this.data.shards) { + for (const { master, replicas } of this.state.shards) { if (master.client) { promises.push( this.#execOnNodeClient(master.client, fn) @@ -476,43 +476,43 @@ export default class RedisClusterSlots< const slotNumber = calculateSlot(firstKey); if (!isReadonly) { - return this.nodeClient(this.data.slots[slotNumber].master); + return this.nodeClient(this.state.slots[slotNumber].master); } return this.nodeClient(this.getSlotRandomNode(slotNumber)); } *#iterateAllNodes() { - let i = Math.floor(Math.random() * (this.data.masters.length + this.data.replicas.length)); - if (i < this.data.masters.length) { + let i = Math.floor(Math.random() * (this.state.masters.length + this.state.replicas.length)); + if (i < this.state.masters.length) { do { - yield this.data.masters[i]; - } while (++i < this.data.masters.length); + yield this.state.masters[i]; + } while (++i < this.state.masters.length); - for (const replica of this.data.replicas) { + for (const replica of this.state.replicas) { yield replica; } } else { - i -= this.data.masters.length; + i -= this.state.masters.length; do { - yield this.data.replicas[i]; - } while (++i < this.data.replicas.length); + yield this.state.replicas[i]; + } while (++i < this.state.replicas.length); } while (true) { - for (const master of this.data.masters) { + for (const master of this.state.masters) { yield master; } - for (const replica of this.data.replicas) { + for (const replica of this.state.replicas) { yield replica; } } } getRandomNode() { - this.data.randomNodeIterator ??= this.#iterateAllNodes(); - return this.data.randomNodeIterator.next().value as ShardNode; + this.state.randomNodeIterator ??= this.#iterateAllNodes(); + return this.state.randomNodeIterator.next().value as ShardNode; } *#slotNodesIterator(slot: ShardWithReplicas) { @@ -533,7 +533,7 @@ export default class RedisClusterSlots< } getSlotRandomNode(slotNumber: number) { - const slot = this.data.slots[slotNumber]; + const slot = this.state.slots[slotNumber]; if (!slot.replicas?.length) { return slot.master; } @@ -552,14 +552,14 @@ export default class RedisClusterSlots< getPubSubClient() { return this.pubSubNode ? this.pubSubNode.client : - this.#initiatePubSubClient(this.data); + this.#initiatePubSubClient(this.state); } - async #initiatePubSubClient(data: RedisClusterSlotsData, toResubscribe?: PubSubToResubscribe) { - const index = Math.floor(Math.random() * (data.masters.length + data.replicas.length)), - node = index < data.masters.length ? - data.masters[index] : - data.replicas[index - data.masters.length]; + async #initiatePubSubClient(state: RedisClusterSlotsState, toResubscribe?: PubSubToResubscribe) { + const index = Math.floor(Math.random() * (state.masters.length + state.replicas.length)), + node = index < state.masters.length ? + state.masters[index] : + state.replicas[index - state.masters.length]; this.pubSubNode = { address: node.address, @@ -597,7 +597,7 @@ export default class RedisClusterSlots< } getShardedPubSubClient(channel: string) { - const { master } = this.data.slots[calculateSlot(channel)]; + const { master } = this.state.slots[calculateSlot(channel)]; return master.pubSubClient ?? this.#initiateShardedPubSubClient(master); } @@ -635,7 +635,7 @@ export default class RedisClusterSlots< channel: string, unsubscribe: (client: RedisClientType) => Promise ): Promise { - const { master } = this.data.slots[calculateSlot(channel)]; + const { master } = this.state.slots[calculateSlot(channel)]; if (!master.pubSubClient) return Promise.resolve(); const client = await master.pubSubClient; diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 584cf76db3a..09fca705dd6 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -100,19 +100,19 @@ export default class RedisCluster< readonly #slots: RedisClusterSlots; get slots() { - return this.#slots.data.slots; + return this.#slots.state.slots; } get shards() { - return this.#slots.data.shards; + return this.#slots.state.shards; } get masters() { - return this.#slots.data.masters; + return this.#slots.state.masters; } get replicas() { - return this.#slots.data.replicas; + return this.#slots.state.replicas; } get nodeByAddress() {