Skip to content

Commit b82fcf5

Browse files
committed
handle server sunsubscribe
1 parent 52465d5 commit b82fcf5

File tree

4 files changed

+112
-42
lines changed

4 files changed

+112
-42
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { AbortError, ErrorReply } from '../errors';
33
import { RedisCommandArguments, RedisCommandRawReply } from '../commands';
44
import RESP2Decoder from './RESP2/decoder';
55
import encodeCommand from './RESP2/encoder';
6-
import { PubSub, PubSubListener, PubSubType } from './pub-sub';
6+
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType } from './pub-sub';
77

88
export interface QueueCommandOptions {
99
asap?: boolean;
@@ -28,10 +28,10 @@ interface CommandWaitingForReply {
2828
returnBuffers?: boolean;
2929
}
3030

31-
type PubSubCommand = ReturnType<typeof PubSub.prototype.subscribe | typeof PubSub.prototype.unsubscribe>;
32-
3331
const PONG = Buffer.from('pong');
3432

33+
type OnServerSUnsubscribe = (channel: string, listeners?: ChannelListeners) => void;
34+
3535
export default class RedisCommandsQueue {
3636
static #flushQueue<T extends CommandWaitingForReply>(queue: LinkedList<T>, err: Error): void {
3737
while (queue.length) {
@@ -42,6 +42,7 @@ export default class RedisCommandsQueue {
4242
readonly #maxLength: number | null | undefined;
4343
readonly #waitingToBeSent = new LinkedList<CommandWaitingToBeSent>();
4444
readonly #waitingForReply = new LinkedList<CommandWaitingForReply>();
45+
readonly #onServerSUnsubscribe: OnServerSUnsubscribe;
4546

4647
readonly #pubSub = new PubSub();
4748

@@ -55,7 +56,16 @@ export default class RedisCommandsQueue {
5556
onReply: reply => {
5657
if (this.#pubSub.isActive && Array.isArray(reply)) {
5758
if (this.#pubSub.handleMessageReply(reply as Array<Buffer>)) return;
58-
if (this.#pubSub.isStatusReply(reply as Array<Buffer>)) {
59+
60+
const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(reply as Array<Buffer>);
61+
if (isShardedUnsubscribe && !this.#waitingForReply.length) {
62+
const channel = (reply[1] as Buffer).toString();
63+
this.#onServerSUnsubscribe(
64+
channel,
65+
this.#pubSub.removeShardedListeners(channel)
66+
);
67+
return;
68+
} else if (isShardedUnsubscribe || PubSub.isStatusReply(reply as Array<Buffer>)) {
5969
const head = this.#waitingForReply.head!.value;
6070
if (
6171
(Number.isNaN(head.channelsCounter!) && reply[2] === 0) ||
@@ -82,8 +92,12 @@ export default class RedisCommandsQueue {
8292
}
8393
});
8494

85-
constructor(maxLength: number | null | undefined) {
95+
constructor(
96+
maxLength: number | null | undefined,
97+
onServerSUnsubscribe: OnServerSUnsubscribe
98+
) {
8699
this.#maxLength = maxLength;
100+
this.#onServerSUnsubscribe = onServerSUnsubscribe;
87101
}
88102

89103
addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions): Promise<T> {

packages/client/lib/client/index.spec.ts

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ import { defineScript } from '../lua-script';
88
import { spy } from 'sinon';
99
import { once } from 'events';
1010
import { ClientKillFilters } from '../commands/CLIENT_KILL';
11+
import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT';
12+
13+
// We need to use 'require', because it's not possible with Typescript to import
14+
// function that are exported as 'module.exports = function`, without esModuleInterop
15+
// set to true.
16+
const calculateSlot = require('cluster-key-slot');
1117

1218
export const SQUARE_SCRIPT = defineScript({
1319
SCRIPT: 'return ARGV[1] * ARGV[1];',
@@ -848,32 +854,65 @@ describe('Client', () => {
848854
disableClientSetup: true
849855
});
850856

851-
testUtils.testWithClient('shareded PubSub', async publisher => {
852-
const subscriber = publisher.duplicate();
853-
854-
await subscriber.connect();
855-
856-
try {
857-
const listener = spy();
858-
await subscriber.sSubscribe('channel', listener);
859-
860-
await Promise.all([
861-
waitTillBeenCalled(listener),
862-
publisher.sPublish('channel', 'message')
863-
]);
857+
describe('shareded PubSub', () => {
858+
testUtils.isVersionGreaterThanHook([7]);
864859

865-
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
860+
testUtils.testWithClient('should be able to recive messages', async publisher => {
861+
const subscriber = publisher.duplicate();
862+
863+
await subscriber.connect();
864+
865+
try {
866+
const listener = spy();
867+
await subscriber.sSubscribe('channel', listener);
868+
869+
await Promise.all([
870+
waitTillBeenCalled(listener),
871+
publisher.sPublish('channel', 'message')
872+
]);
873+
874+
assert.ok(listener.calledOnceWithExactly('message', 'channel'));
875+
876+
await subscriber.sUnsubscribe();
877+
878+
// should be able to send commands
879+
await assert.doesNotReject(subscriber.ping());
880+
} finally {
881+
await subscriber.disconnect();
882+
}
883+
}, {
884+
...GLOBAL.SERVERS.OPEN
885+
});
866886

867-
await subscriber.sUnsubscribe();
887+
testUtils.testWithClient('should handle sunsubscribe from server', async publisher => {
888+
await publisher.clusterAddSlotsRange({ start: 0, end: 16383 });
868889

869-
// should be able to send commands
870-
await assert.doesNotReject(subscriber.ping());
871-
} finally {
872-
await subscriber.disconnect();
873-
}
874-
}, {
875-
...GLOBAL.SERVERS.OPEN,
876-
minimumDockerVersion: [7]
890+
const subscriber = publisher.duplicate();
891+
892+
await subscriber.connect();
893+
894+
try {
895+
await subscriber.sSubscribe('channel', () => {});
896+
897+
await Promise.all([
898+
publisher.clusterSetSlot(
899+
calculateSlot('channel'),
900+
ClusterSlotStates.NODE,
901+
await publisher.clusterMyId()
902+
),
903+
once(subscriber, 'server-sunsubscribe')
904+
]);
905+
906+
assert.equal(
907+
await subscriber.ping(),
908+
'PONG'
909+
);
910+
} finally {
911+
await subscriber.disconnect();
912+
}
913+
}, {
914+
serverArguments: ['--cluster-enabled', 'yes']
915+
});
877916
});
878917

879918
testUtils.testWithClient('should handle errors in SUBSCRIBE', async publisher => {

packages/client/lib/client/index.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,10 @@ export default class RedisClient<
216216
}
217217

218218
#initiateQueue(): RedisCommandsQueue {
219-
return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength);
219+
return new RedisCommandsQueue(
220+
this.#options?.commandsQueueMaxLength,
221+
(channel, listeners) => this.emit('server-sunsubscribe', channel, listeners)
222+
);
220223
}
221224

222225
#initiateSocket(): RedisSocket {

packages/client/lib/client/pub-sub.ts

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,31 @@ export type PubSubListener<
2828
RETURN_BUFFERS extends boolean = false
2929
> = <T extends RETURN_BUFFERS extends true ? Buffer : string>(message: T, channel: T) => unknown;
3030

31-
interface ChannelListeners {
31+
export interface ChannelListeners {
3232
unsubscribing: boolean;
3333
buffers: Set<PubSubListener<true>>;
3434
strings: Set<PubSubListener<false>>;
3535
}
3636

3737
type Listeners = Record<PubSubType, Map<string, ChannelListeners>>;
3838

39+
export type PubSubCommand = ReturnType<typeof PubSub.prototype.subscribe | typeof PubSub.prototype.unsubscribe>;
40+
3941
export class PubSub {
42+
static isStatusReply(reply: Array<Buffer>): boolean {
43+
return (
44+
COMMANDS[PubSubType.CHANNELS].subscribe.equals(reply[0]) ||
45+
COMMANDS[PubSubType.CHANNELS].unsubscribe.equals(reply[0]) ||
46+
COMMANDS[PubSubType.PATTERNS].subscribe.equals(reply[0]) ||
47+
COMMANDS[PubSubType.PATTERNS].unsubscribe.equals(reply[0]) ||
48+
COMMANDS[PubSubType.SHARDED].subscribe.equals(reply[0])
49+
);
50+
}
51+
52+
static isShardedUnsubscribe(reply: Array<Buffer>): boolean {
53+
return COMMANDS[PubSubType.SHARDED].unsubscribe.equals(reply[0]);
54+
}
55+
4056
static #channelsArray(channels: string | Array<string>) {
4157
return (Array.isArray(channels) ? channels : [channels]);
4258
}
@@ -227,7 +243,7 @@ export class PubSub {
227243
this.#subscribing = 0;
228244
}
229245

230-
resubscribe() {
246+
resubscribe(): Array<PubSubCommand> {
231247
const commands = [];
232248
for (const [type, listeners] of Object.entries(this.#listeners)) {
233249
if (!listeners.size) continue;
@@ -276,6 +292,15 @@ export class PubSub {
276292

277293
return false;
278294
}
295+
296+
removeShardedListeners(channel: string): ChannelListeners | undefined {
297+
const listeners = this.#listeners[PubSubType.SHARDED].get(channel);
298+
if (listeners) {
299+
this.#listeners[PubSubType.SHARDED].delete(channel);
300+
this.#updateIsActive();
301+
return listeners;
302+
}
303+
}
279304

280305
#emitPubSubMessage(
281306
type: PubSubType,
@@ -304,15 +329,4 @@ export class PubSub {
304329
listener(messageString, channelString);
305330
}
306331
}
307-
308-
isStatusReply(reply: Array<Buffer>): boolean {
309-
return (
310-
COMMANDS[PubSubType.CHANNELS].subscribe.equals(reply[0]) ||
311-
COMMANDS[PubSubType.CHANNELS].unsubscribe.equals(reply[0]) ||
312-
COMMANDS[PubSubType.PATTERNS].subscribe.equals(reply[0]) ||
313-
COMMANDS[PubSubType.PATTERNS].unsubscribe.equals(reply[0]) ||
314-
COMMANDS[PubSubType.SHARDED].subscribe.equals(reply[0]) ||
315-
COMMANDS[PubSubType.SHARDED].unsubscribe.equals(reply[0])
316-
);
317-
}
318332
}

0 commit comments

Comments
 (0)