@@ -5,7 +5,7 @@ import { PassThrough } from 'node:stream';
5
5
import { KubeConfig } from './config.js' ;
6
6
import { Cluster , Context , User } from './config_types.js' ;
7
7
import { Watch } from './watch.js' ;
8
- import { IncomingMessage , createServer } from 'node:http' ;
8
+ import { ServerResponse , createServer } from 'node:http' ;
9
9
import { AddressInfo } from 'node:net' ;
10
10
11
11
const server = 'https://foo.company.com' ;
@@ -72,11 +72,7 @@ describe('Watch', () => {
72
72
s . done ( ) ;
73
73
} ) ;
74
74
75
- it ( 'should not call watch done callback more than once' , async ( ) => {
76
- const kc = new KubeConfig ( ) ;
77
- Object . assign ( kc , fakeConfig ) ;
78
- const watch = new Watch ( kc ) ;
79
-
75
+ it ( 'should not call watch done callback more than once' , async ( t ) => {
80
76
const obj1 = {
81
77
type : 'ADDED' ,
82
78
object : {
@@ -93,26 +89,13 @@ describe('Watch', () => {
93
89
94
90
const path = '/some/path/to/object' ;
95
91
96
- const stream = new PassThrough ( ) ;
97
-
98
- const [ scope ] = systemUnderTest ( ) ;
99
-
100
- let response : IncomingMessage | undefined ;
101
-
102
- const s = scope
103
- . get ( path )
104
- . query ( {
105
- watch : 'true' ,
106
- a : 'b' ,
107
- } )
108
- . reply ( 200 , function ( ) : PassThrough {
109
- this . req . on ( 'response' , ( r ) => {
110
- response = r ;
111
- } ) ;
112
- stream . push ( JSON . stringify ( obj1 ) + '\n' ) ;
113
- stream . push ( JSON . stringify ( obj2 ) + '\n' ) ;
114
- return stream ;
115
- } ) ;
92
+ let response : ServerResponse | undefined ;
93
+ const kc = await setupMockSystem ( t , ( req , res ) => {
94
+ response = res ;
95
+ res . write ( JSON . stringify ( obj1 ) + '\n' ) ;
96
+ res . write ( JSON . stringify ( obj2 ) + '\n' ) ;
97
+ } ) ;
98
+ const watch = new Watch ( kc ) ;
116
99
117
100
const receivedTypes : string [ ] = [ ] ;
118
101
const receivedObjects : string [ ] = [ ] ;
@@ -154,57 +137,27 @@ describe('Watch', () => {
154
137
deepStrictEqual ( receivedObjects , [ obj1 . object , obj2 . object ] ) ;
155
138
156
139
strictEqual ( doneCalled , 0 ) ;
157
-
158
- const errIn = new Error ( 'err' ) ;
159
- ( response as IncomingMessage ) . destroy ( errIn ) ;
160
-
140
+ response ! . destroy ( ) ;
161
141
await donePromise ;
162
-
163
142
strictEqual ( doneCalled , 1 ) ;
164
- deepStrictEqual ( doneErr , errIn ) ;
165
-
166
- s . done ( ) ;
167
-
168
- stream . destroy ( ) ;
143
+ strictEqual ( doneErr . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
169
144
} ) ;
170
145
171
- it ( 'should not call the done callback more than once on unexpected connection loss' , async ( ) => {
146
+ it ( 'should not call the done callback more than once on unexpected connection loss' , async ( t ) => {
172
147
// Create a server that accepts the connection and flushes headers, then
173
148
// immediately destroys the connection (causing a "Premature close"
174
149
// error).
175
150
//
176
151
// This reproduces a bug where AbortController.abort() inside
177
152
// doneCallOnce could cause done() to be invoked twice.
178
-
179
- const mockServer = createServer ( ( req , res ) => {
153
+ const kc = await setupMockSystem ( t , ( req , res ) => {
180
154
res . writeHead ( 200 , {
181
155
'Content-Type' : 'application/json' ,
182
156
'Transfer-Encoding' : 'chunked' ,
183
157
} ) ;
184
-
185
158
res . flushHeaders ( ) ;
186
159
res . destroy ( ) ; // Prematurely close the connection
187
160
} ) ;
188
-
189
- const mockServerPort = await new Promise < number > ( ( resolve ) => {
190
- mockServer . listen ( 0 , ( ) => {
191
- resolve ( ( mockServer . address ( ) as AddressInfo ) . port ) ;
192
- } ) ;
193
- } ) ;
194
-
195
- const kc = new KubeConfig ( ) ;
196
-
197
- kc . loadFromClusterAndUser (
198
- {
199
- name : 'cluster' ,
200
- server : `http://localhost:${ mockServerPort } ` ,
201
- skipTLSVerify : true ,
202
- } ,
203
- {
204
- name : 'user' ,
205
- } ,
206
- ) ;
207
-
208
161
const watch = new Watch ( kc ) ;
209
162
210
163
let doneCalled = 0 ;
@@ -225,15 +178,15 @@ describe('Watch', () => {
225
178
) ;
226
179
227
180
await donePromise ;
228
-
229
- mockServer . close ( ) ;
230
-
231
181
strictEqual ( doneCalled , 1 ) ;
232
182
} ) ;
233
183
234
- it ( 'should call setKeepAlive on the socket to extend the default of 5 mins' , async ( ) => {
235
- const kc = new KubeConfig ( ) ;
236
-
184
+ it ( 'should call setKeepAlive on the socket to extend the default of 5 mins' , async ( t ) => {
185
+ let response : ServerResponse | undefined ;
186
+ const kc = await setupMockSystem ( t , ( req , res ) => {
187
+ response = res ;
188
+ res . write ( JSON . stringify ( obj1 ) + '\n' ) ;
189
+ } ) ;
237
190
const mockSocket = {
238
191
setKeepAlive : function ( enable : boolean , timeout : number ) {
239
192
this . keepAliveEnabled = enable ;
@@ -242,16 +195,16 @@ describe('Watch', () => {
242
195
keepAliveEnabled : false ,
243
196
keepAliveTimeout : 0 ,
244
197
} ;
245
- Object . assign ( kc , {
246
- ... fakeConfig ,
247
- applyToFetchOptions : async ( ) => ( {
198
+
199
+ ( kc as any ) . applyToFetchOptions = async ( ) => {
200
+ return {
248
201
agent : {
249
202
sockets : {
250
203
'mock-url' : [ mockSocket ] ,
251
204
} ,
252
205
} ,
253
- } ) ,
254
- } ) ;
206
+ } ;
207
+ } ;
255
208
const watch = new Watch ( kc ) ;
256
209
257
210
const obj1 = {
@@ -262,27 +215,6 @@ describe('Watch', () => {
262
215
} ;
263
216
264
217
const path = '/some/path/to/object' ;
265
-
266
- const stream = new PassThrough ( ) ;
267
-
268
- const [ scope ] = systemUnderTest ( ) ;
269
-
270
- let response : IncomingMessage | undefined ;
271
-
272
- const s = scope
273
- . get ( path )
274
- . query ( {
275
- watch : 'true' ,
276
- a : 'b' ,
277
- } )
278
- . reply ( 200 , function ( ) : PassThrough {
279
- this . req . on ( 'response' , ( r ) => {
280
- response = r ;
281
- } ) ;
282
- stream . push ( JSON . stringify ( obj1 ) + '\n' ) ;
283
- return stream ;
284
- } ) ;
285
-
286
218
const receivedTypes : string [ ] = [ ] ;
287
219
const receivedObjects : string [ ] = [ ] ;
288
220
let doneCalled = 0 ;
@@ -326,46 +258,28 @@ describe('Watch', () => {
326
258
strictEqual ( mockSocket . keepAliveEnabled , true ) ;
327
259
strictEqual ( mockSocket . keepAliveTimeout , 30000 ) ;
328
260
329
- const errIn = new Error ( 'err' ) ;
330
- ( response as IncomingMessage ) . destroy ( errIn ) ;
261
+ response ! . destroy ( ) ;
331
262
332
263
await donePromise ;
333
264
334
265
strictEqual ( doneCalled , 1 ) ;
335
- deepStrictEqual ( doneErr , errIn ) ;
336
-
337
- s . done ( ) ;
338
-
339
- stream . destroy ( ) ;
266
+ strictEqual ( doneErr . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
340
267
} ) ;
341
268
342
- it ( 'should handle server errors correctly' , async ( ) => {
343
- const kc = new KubeConfig ( ) ;
344
- Object . assign ( kc , fakeConfig ) ;
345
- const watch = new Watch ( kc ) ;
346
-
269
+ it ( 'should handle server errors correctly' , async ( t ) => {
347
270
const obj1 = {
348
271
type : 'ADDED' ,
349
272
object : {
350
273
foo : 'bar' ,
351
274
} ,
352
275
} ;
353
-
354
- const stream = new PassThrough ( ) ;
355
-
356
- const [ scope ] = systemUnderTest ( ) ;
357
-
358
276
const path = '/some/path/to/object?watch=true' ;
359
-
360
- let response : IncomingMessage | undefined ;
361
-
362
- const s = scope . get ( path ) . reply ( 200 , function ( ) : PassThrough {
363
- this . req . on ( 'response' , ( r ) => {
364
- response = r ;
365
- } ) ;
366
- stream . push ( JSON . stringify ( obj1 ) + '\n' ) ;
367
- return stream ;
277
+ let response : ServerResponse | undefined ;
278
+ const kc = await setupMockSystem ( t , ( req , res ) => {
279
+ response = res ;
280
+ res . write ( JSON . stringify ( obj1 ) + '\n' ) ;
368
281
} ) ;
282
+ const watch = new Watch ( kc ) ;
369
283
370
284
const receivedTypes : string [ ] = [ ] ;
371
285
const receivedObjects : string [ ] = [ ] ;
@@ -405,16 +319,12 @@ describe('Watch', () => {
405
319
strictEqual ( doneErr . length , 0 ) ;
406
320
407
321
const errIn = new Error ( 'err' ) ;
408
- ( response as IncomingMessage ) . destroy ( errIn ) ;
322
+ response ! . destroy ( errIn ) ;
409
323
410
324
await donePromise ;
411
325
412
326
strictEqual ( doneErr . length , 1 ) ;
413
- deepStrictEqual ( doneErr [ 0 ] , errIn ) ;
414
-
415
- s . done ( ) ;
416
-
417
- stream . destroy ( ) ;
327
+ strictEqual ( doneErr [ 0 ] . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
418
328
} ) ;
419
329
420
330
it ( 'should handle server side close correctly' , async ( ) => {
@@ -555,3 +465,31 @@ describe('Watch', () => {
555
465
await rejects ( promise ) ;
556
466
} ) ;
557
467
} ) ;
468
+
469
+ async function setupMockSystem ( ctx , handler ) {
470
+ const server = createServer ( handler ) ;
471
+ ctx . after ( ( ) => {
472
+ try {
473
+ server . close ( ) ;
474
+ } catch {
475
+ // Ignore errors during server close.
476
+ }
477
+ } ) ;
478
+ const port = await new Promise < number > ( ( resolve ) => {
479
+ server . listen ( 0 , ( ) => {
480
+ resolve ( ( server . address ( ) as AddressInfo ) . port ) ;
481
+ } ) ;
482
+ } ) ;
483
+ const kc = new KubeConfig ( ) ;
484
+ kc . loadFromClusterAndUser (
485
+ {
486
+ name : 'cluster' ,
487
+ server : `http://localhost:${ port } ` ,
488
+ skipTLSVerify : true ,
489
+ } ,
490
+ {
491
+ name : 'user' ,
492
+ } ,
493
+ ) ;
494
+ return kc ;
495
+ }
0 commit comments