Skip to content

Commit 811b554

Browse files
committed
fix #2345 - allow PING in PubSub mode (remove client side validation)
1 parent de12812 commit 811b554

File tree

5 files changed

+88
-24
lines changed

5 files changed

+88
-24
lines changed

packages/client/lib/client/commands-queue.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ export interface QueueCommandOptions {
1010
chainId?: symbol;
1111
signal?: AbortSignal;
1212
returnBuffers?: boolean;
13-
ignorePubSubMode?: boolean;
1413
}
1514

1615
export interface CommandWaitingToBeSent extends CommandWaitingForReply {
@@ -31,6 +30,8 @@ interface CommandWaitingForReply {
3130

3231
type PubSubCommand = ReturnType<typeof PubSub.prototype.subscribe | typeof PubSub.prototype.unsubscribe>;
3332

33+
const PONG = Buffer.from('pong');
34+
3435
export default class RedisCommandsQueue {
3536
static #flushQueue<T extends CommandWaitingForReply>(queue: LinkedList<T>, err: Error): void {
3637
while (queue.length) {
@@ -53,24 +54,25 @@ export default class RedisCommandsQueue {
5354
},
5455
onReply: reply => {
5556
if (this.#pubSub.isActive && Array.isArray(reply)) {
56-
if (
57-
!this.#pubSub.handleMessageReply(reply as Array<Buffer>) &&
58-
this.#pubSub.isStatusReply(reply as Array<Buffer>)
59-
) {
57+
if (this.#pubSub.handleMessageReply(reply as Array<Buffer>)) return;
58+
if (this.#pubSub.isStatusReply(reply as Array<Buffer>)) {
6059
const head = this.#waitingForReply.head!.value;
6160
if (
6261
(Number.isNaN(head.channelsCounter!) && reply[2] === 0) ||
6362
--head.channelsCounter! === 0
6463
) {
6564
this.#waitingForReply.shift()!.resolve();
6665
}
66+
return;
67+
}
68+
if (PONG.equals(reply[0] as Buffer)) {
69+
const { resolve, returnBuffers } = this.#waitingForReply.shift()!,
70+
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
71+
resolve(returnBuffers ? buffer : buffer.toString());
72+
return;
6773
}
68-
69-
return;
70-
} else if (!this.#waitingForReply.length) {
71-
throw new Error('Got an unexpected reply from Redis');
7274
}
73-
75+
7476
const { resolve, reject } = this.#waitingForReply.shift()!;
7577
if (reply instanceof ErrorReply) {
7678
reject(reply);
@@ -85,9 +87,7 @@ export default class RedisCommandsQueue {
8587
}
8688

8789
addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions): Promise<T> {
88-
if (this.#pubSub.isActive && !options?.ignorePubSubMode) {
89-
return Promise.reject(new Error('Cannot send commands in PubSub mode'));
90-
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
90+
if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
9191
return Promise.reject(new Error('The queue is full'));
9292
} else if (options?.signal?.aborted) {
9393
return Promise.reject(new AbortError());

packages/client/lib/client/index.spec.ts

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ describe('Client', () => {
675675
);
676676
}, GLOBAL.SERVERS.OPEN);
677677

678-
describe('PubSub', () => {
678+
describe.only('PubSub', () => {
679679
testUtils.testWithClient('should be able to publish and subscribe to messages', async publisher => {
680680
function assertStringListener(message: string, channel: string) {
681681
assert.equal(typeof message, 'string');
@@ -793,7 +793,34 @@ describe('Client', () => {
793793
}
794794
}, GLOBAL.SERVERS.OPEN);
795795

796-
testUtils.testWithClient('should be able to quit in PubSub mode', async client => {
796+
testUtils.testWithClient('should be able to PING in PubSub mode', async client => {
797+
await client.connect();
798+
799+
try {
800+
await client.subscribe('channel', () => {
801+
// noop
802+
});
803+
804+
const [string, buffer, customString, customBuffer] = await Promise.all([
805+
client.ping(),
806+
client.ping(client.commandOptions({ returnBuffers: true })),
807+
client.ping('custom'),
808+
client.ping(client.commandOptions({ returnBuffers: true }), 'custom')
809+
]);
810+
811+
assert.equal(string, 'pong');
812+
assert.deepEqual(buffer, Buffer.from('pong'));
813+
assert.equal(customString, 'custom');
814+
assert.deepEqual(customBuffer, Buffer.from('custom'));
815+
} finally {
816+
await client.disconnect();
817+
}
818+
}, {
819+
...GLOBAL.SERVERS.OPEN,
820+
disableClientSetup: true
821+
});
822+
823+
testUtils.testWithClient('should be able to QUIT in PubSub mode', async client => {
797824
await client.subscribe('channel', () => {
798825
// noop
799826
});
@@ -803,6 +830,24 @@ describe('Client', () => {
803830
assert.equal(client.isOpen, false);
804831
}, GLOBAL.SERVERS.OPEN);
805832

833+
834+
testUtils.testWithClient('should reject GET in PubSub mode', async client => {
835+
await client.connect();
836+
837+
try {
838+
await client.subscribe('channel', () => {
839+
// noop
840+
});
841+
842+
await assert.rejects(client.get('key'));
843+
} finally {
844+
await client.disconnect();
845+
}
846+
}, {
847+
...GLOBAL.SERVERS.OPEN,
848+
disableClientSetup: true
849+
});
850+
806851
testUtils.testWithClient('shareded PubSub', async publisher => {
807852
const subscriber = publisher.duplicate();
808853

@@ -862,7 +907,7 @@ describe('Client', () => {
862907
}
863908
}, {
864909
// this test change ACL rules, running in isolated server
865-
...GLOBAL.SERVERS.OPEN
910+
serverArguments: []
866911
});
867912
});
868913

packages/client/lib/client/index.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -603,9 +603,7 @@ export default class RedisClient<
603603

604604
QUIT(): Promise<void> {
605605
return this.#socket.quit(() => {
606-
const quitPromise = this.#queue.addCommand(['QUIT'], {
607-
ignorePubSubMode: true
608-
});
606+
const quitPromise = this.#queue.addCommand(['QUIT']);
609607
this.#tick();
610608
return Promise.all([
611609
quitPromise,

packages/client/lib/commands/PING.spec.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
11
import { strict as assert } from 'assert';
22
import testUtils, { GLOBAL } from '../test-utils';
3-
import RedisClient from '../client';
3+
import { transformArguments } from './PING';
44

55
describe('PING', () => {
6+
describe('transformArguments', () => {
7+
it('default', () => {
8+
assert.deepEqual(
9+
transformArguments(),
10+
['PING']
11+
);
12+
});
13+
14+
it('with message', () => {
15+
assert.deepEqual(
16+
transformArguments('message'),
17+
['PING', 'message']
18+
);
19+
});
20+
});
21+
622
describe('client.ping', () => {
723
testUtils.testWithClient('string', async client => {
824
assert.equal(
@@ -13,7 +29,7 @@ describe('PING', () => {
1329

1430
testUtils.testWithClient('buffer', async client => {
1531
assert.deepEqual(
16-
await client.ping(RedisClient.commandOptions({ returnBuffers: true })),
32+
await client.ping(client.commandOptions({ returnBuffers: true })),
1733
Buffer.from('PONG')
1834
);
1935
}, GLOBAL.SERVERS.OPEN);

packages/client/lib/commands/PING.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1-
import { RedisCommandArgument } from '.';
1+
import { RedisCommandArgument, RedisCommandArguments } from '.';
22

3-
export function transformArguments(): Array<string> {
4-
return ['PING'];
3+
export function transformArguments(message?: RedisCommandArgument): RedisCommandArguments {
4+
const args: RedisCommandArguments = ['PING'];
5+
if (message) {
6+
args.push(message);
7+
}
8+
9+
return args;
510
}
611

712
export declare function transformReply(): RedisCommandArgument;

0 commit comments

Comments
 (0)