Skip to content

Commit 3e3c315

Browse files
brendanburnsbrendandburns
authored andcommitted
Add support for the v5 streaming protocol
1 parent 3c81be4 commit 3e3c315

File tree

2 files changed

+163
-9
lines changed

2 files changed

+163
-9
lines changed

src/web-socket-handler.ts

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ import stream from 'node:stream';
44
import { V1Status } from './api';
55
import { KubeConfig } from './config';
66

7-
const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io'];
7+
const protocols = [
8+
'v5.channel.k8s.io',
9+
'v4.channel.k8s.io',
10+
'v3.channel.k8s.io',
11+
'v2.channel.k8s.io',
12+
'channel.k8s.io',
13+
];
814

915
export interface WebSocketInterface {
1016
connect(
@@ -14,12 +20,37 @@ export interface WebSocketInterface {
1420
): Promise<WebSocket.WebSocket>;
1521
}
1622

23+
export interface StreamInterface {
24+
stdin: stream.Readable;
25+
stdout: stream.Writable;
26+
stderr: stream.Writable;
27+
}
28+
1729
export class WebSocketHandler implements WebSocketInterface {
1830
public static readonly StdinStream: number = 0;
1931
public static readonly StdoutStream: number = 1;
2032
public static readonly StderrStream: number = 2;
2133
public static readonly StatusStream: number = 3;
2234
public static readonly ResizeStream: number = 4;
35+
public static readonly CloseStream: number = 255;
36+
37+
public static supportsClose(protocol: string): boolean {
38+
return protocol === 'v5.channel.k8s.io';
39+
}
40+
41+
public static closeStream(streamNum: number, streams: StreamInterface): void {
42+
switch (streamNum) {
43+
case WebSocketHandler.StdinStream:
44+
streams.stdin.pause();
45+
break;
46+
case WebSocketHandler.StdoutStream:
47+
streams.stdout.end();
48+
break;
49+
case WebSocketHandler.StderrStream:
50+
streams.stderr.end();
51+
break;
52+
}
53+
}
2354

2455
public static handleStandardStreams(
2556
streamNum: number,
@@ -36,6 +67,7 @@ export class WebSocketHandler implements WebSocketInterface {
3667
stderr.write(buff);
3768
} else if (streamNum === WebSocketHandler.StatusStream) {
3869
// stream closing.
70+
// Hacky, change tests to use the stream interface
3971
if (stdout && stdout !== process.stdout) {
4072
stdout.end();
4173
}
@@ -59,6 +91,12 @@ export class WebSocketHandler implements WebSocketInterface {
5991
});
6092

6193
stdin.on('end', () => {
94+
if (WebSocketHandler.supportsClose(ws.protocol)) {
95+
const buff = Buffer.alloc(2);
96+
buff.writeUint8(this.CloseStream, 0);
97+
buff.writeUint8(this.StdinStream, 1);
98+
ws.send(buff);
99+
}
62100
ws.close();
63101
});
64102
// Keep the stream open
@@ -131,7 +169,16 @@ export class WebSocketHandler implements WebSocketInterface {
131169
// factory is really just for test injection
132170
public constructor(
133171
readonly config: KubeConfig,
134-
readonly socketFactory?: (uri: string, opts: WebSocket.ClientOptions) => WebSocket.WebSocket,
172+
readonly socketFactory?: (
173+
uri: string,
174+
protocols: string[],
175+
opts: WebSocket.ClientOptions,
176+
) => WebSocket.WebSocket,
177+
readonly streams: StreamInterface = {
178+
stdin: process.stdin,
179+
stdout: process.stdout,
180+
stderr: process.stderr,
181+
},
135182
) {}
136183

137184
/**
@@ -163,7 +210,7 @@ export class WebSocketHandler implements WebSocketInterface {
163210

164211
return await new Promise<WebSocket.WebSocket>((resolve, reject) => {
165212
const client = this.socketFactory
166-
? this.socketFactory(uri, opts)
213+
? this.socketFactory(uri, protocols, opts)
167214
: new WebSocket(uri, protocols, opts);
168215
let resolved = false;
169216

@@ -181,11 +228,17 @@ export class WebSocketHandler implements WebSocketInterface {
181228
client.onmessage = ({ data }: { data: WebSocket.Data }) => {
182229
// TODO: support ArrayBuffer and Buffer[] data types?
183230
if (typeof data === 'string') {
231+
if (data.charCodeAt(0) === WebSocketHandler.CloseStream) {
232+
WebSocketHandler.closeStream(data.charCodeAt(1), this.streams);
233+
}
184234
if (textHandler && !textHandler(data)) {
185235
client.close();
186236
}
187237
} else if (data instanceof Buffer) {
188-
const streamNum = data.readInt8(0);
238+
const streamNum = data.readUint8(0);
239+
if (streamNum === WebSocketHandler.CloseStream) {
240+
WebSocketHandler.closeStream(data.readInt8(1), this.streams);
241+
}
189242
if (binaryHandler && !binaryHandler(streamNum, data.slice(1))) {
190243
client.close();
191244
}

src/web-socket-handler_test.ts

Lines changed: 106 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { Readable } from 'node:stream';
1+
import { Readable, Writable } from 'node:stream';
22
import { setImmediate as setImmediatePromise } from 'node:timers/promises';
33
import { expect } from 'chai';
44
import WebSocket from 'isomorphic-ws';
5-
import { WritableStreamBuffer } from 'stream-buffers';
5+
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
66

77
import { V1Status } from './api';
88
import { KubeConfig } from './config';
@@ -117,7 +117,7 @@ describe('WebSocket', () => {
117117

118118
const handler = new WebSocketHandler(
119119
kc,
120-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
120+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
121121
uriOut = uri;
122122
return mockWs as WebSocket.WebSocket;
123123
},
@@ -163,7 +163,7 @@ describe('WebSocket', () => {
163163

164164
const handler = new WebSocketHandler(
165165
kc,
166-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
166+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
167167
uriOut = uri;
168168
return mockWs as WebSocket.WebSocket;
169169
},
@@ -233,7 +233,7 @@ describe('WebSocket', () => {
233233

234234
const handler = new WebSocketHandler(
235235
kc,
236-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
236+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
237237
uriOut = uri;
238238
return mockWs as WebSocket.WebSocket;
239239
},
@@ -314,6 +314,107 @@ describe('WebSocket', () => {
314314
});
315315
});
316316

317+
describe('V5 protocol support', () => {
318+
it('should handle close', async () => {
319+
const kc = new KubeConfig();
320+
const host = 'foo.company.com';
321+
const server = `https://${host}`;
322+
kc.clusters = [
323+
{
324+
name: 'cluster',
325+
server,
326+
} as Cluster,
327+
] as Cluster[];
328+
kc.contexts = [
329+
{
330+
cluster: 'cluster',
331+
user: 'user',
332+
} as Context,
333+
] as Context[];
334+
kc.users = [
335+
{
336+
name: 'user',
337+
} as User,
338+
];
339+
340+
const mockWs = {
341+
protocol: 'v5.channel.k8s.io',
342+
} as WebSocket.WebSocket;
343+
let uriOut = '';
344+
let endCalled = false;
345+
const handler = new WebSocketHandler(
346+
kc,
347+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
348+
uriOut = uri;
349+
return mockWs as WebSocket.WebSocket;
350+
},
351+
{
352+
stdin: process.stdin,
353+
stderr: process.stderr,
354+
stdout: {
355+
end: () => {
356+
endCalled = true;
357+
},
358+
} as Writable,
359+
},
360+
);
361+
const path = '/some/path';
362+
363+
const promise = handler.connect(path, null, null);
364+
await setImmediatePromise();
365+
366+
expect(uriOut).to.equal(`wss://${host}${path}`);
367+
368+
const event = {
369+
target: mockWs,
370+
type: 'open',
371+
};
372+
mockWs.onopen!(event);
373+
const errEvt = {
374+
error: {},
375+
message: 'some message',
376+
type: 'some type',
377+
target: mockWs,
378+
};
379+
const closeBuff = Buffer.alloc(2);
380+
closeBuff.writeUint8(255, 0);
381+
closeBuff.writeUint8(WebSocketHandler.StdoutStream, 1);
382+
383+
mockWs.onmessage!({
384+
data: closeBuff,
385+
type: 'type',
386+
target: mockWs,
387+
});
388+
await promise;
389+
expect(endCalled).to.be.true;
390+
});
391+
it('should handle closing stdin < v4 protocol', () => {
392+
const ws = {
393+
// send is not defined, so this will throw if we try to send the close message.
394+
close: () => {},
395+
} as WebSocket;
396+
const stdinStream = new ReadableStreamBuffer();
397+
WebSocketHandler.handleStandardInput(ws, stdinStream);
398+
stdinStream.emit('end');
399+
});
400+
it('should handle closing stdin v5 protocol', () => {
401+
let sent: Buffer | null = null;
402+
const ws = {
403+
protocol: 'v5.channel.k8s.io',
404+
send: (data) => {
405+
sent = data;
406+
},
407+
close: () => {},
408+
} as WebSocket;
409+
const stdinStream = new ReadableStreamBuffer();
410+
WebSocketHandler.handleStandardInput(ws, stdinStream);
411+
stdinStream.emit('end');
412+
expect(sent).to.not.be.null;
413+
expect(sent!.readUint8(0)).to.equal(255); // CLOSE signal
414+
expect(sent!.readUInt8(1)).to.equal(0); // Stdin stream is #0
415+
});
416+
});
417+
317418
describe('Restartable Handle Standard Input', () => {
318419
it('should throw on negative retry', () => {
319420
const p = new Promise<WebSocket.WebSocket>(() => {});

0 commit comments

Comments
 (0)