From cf4a11d33079779c6607316d2857d51e3123f430 Mon Sep 17 00:00:00 2001 From: schrodit Date: Sat, 10 Aug 2024 14:23:37 +0200 Subject: [PATCH 1/3] add generic and typed watch to KubernetesObjectApi --- src/object.ts | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/src/object.ts b/src/object.ts index 2585113f501..cb5e018da91 100644 --- a/src/object.ts +++ b/src/object.ts @@ -4,6 +4,7 @@ import { ApisApi, HttpError, V1APIResource, V1APIResourceList, V1DeleteOptions, import { KubeConfig } from './config'; import ObjectSerializer from './serializer'; import { KubernetesListObject, KubernetesObject } from './types'; +import { RequestResult, Watch } from './watch'; /** Union type of body types returned by KubernetesObjectApi. */ type KubernetesObjectResponseBody = @@ -44,6 +45,60 @@ enum KubernetesPatchStrategies { StrategicMergePatch = 'application/strategic-merge-patch+json', } +/** + * Describes the type of an watch event. + * Object is: + * - If Type is Added or Modified: the new state of the object. + * - If Type is Deleted: the state of the object immediately before deletion. + * - If Type is Bookmark: the object (instance of a type being watched) where + * only ResourceVersion field is set. On successful restart of watch from a + * bookmark resourceVersion, client is guaranteed to not get repeat event + * nor miss any events. + * - If Type is Error: *api.Status is recommended; other types may make sense + * depending on context. + */ +export enum KubernetesEventType { + ADDED = 'ADDED', + MODIFIED = 'MODIFIED', + DELETED = 'DELETED', + BOOKMARK = 'BOOKMARK', + ERROR = 'ERROR', +} + +export type WatchObject = { + type: KubernetesEventType; + object: T; +}; + +export type WatchCallback = ( + phase: KubernetesEventType, + apiObj: T, + watchObj?: WatchObject, +) => void; + +export type WatchOptions = { + /** + * To mitigate the impact of short history window, + * the Kubernetes API provides a watch event named BOOKMARK. + * It is a special kind of event to mark that all changes + * up to a given resourceVersion the client is requesting + * have already been sent. + * + * See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks + */ + allowWatchBookmarks?: boolean; + /** + * Start watch at the given resource version. + * Starting at a specific resource version means that only events + * starting from that versions are included in the watch stream. + */ + resourceVersion?: string; +}; + +export type WatchResult = { + abort: () => void; +}; + /** * Dynamically construct Kubernetes API request URIs so client does not have to know what type of object it is acting * on. @@ -60,6 +115,7 @@ export class KubernetesObjectApi extends ApisApi { public static makeApiClient(kc: KubeConfig): KubernetesObjectApi { const client = kc.makeApiClient(KubernetesObjectApi); client.setDefaultNamespace(kc); + client.watcher = new Watch(kc); return client; } @@ -69,6 +125,8 @@ export class KubernetesObjectApi extends ApisApi { /** Cache resource API response. */ protected apiVersionResourceCache: Record = {}; + protected watcher?: Watch; + /** * Create any Kubernetes resource. * @param spec Kubernetes resource spec. @@ -474,6 +532,45 @@ export class KubernetesObjectApi extends ApisApi { return this.requestPromise(localVarRequestOptions); } + /** + * Watches the given resources and calls provided callback with the parsed json object + * upon event received over the watcher connection. + * + * @param resource defines the resources to watch. Namespace is optional. + * Undefined namespace means to watch all namespaces. + * @param options Optional options that are passed to the watch request. + * @param callback callback function that is called with the parsed json object upon event received. + * @param done callback is called either when connection is closed or when there + * is an error. In either case, watcher takes care of properly closing the + * underlaying connection so that it doesn't leak any resources. + * + * @returns WatchResult object that can be used to abort the watch. + */ + public async watch({ + resource, + options = {}, + callback, + done, + }: { + resource: { + apiVersion: string; + kind: string; + namespace?: string; + }; + options?: WatchOptions; + callback: WatchCallback; + done: (err: unknown) => void; + }): Promise { + if (!this.watcher) { + throw new Error('Watcher not initialized'); + } + const resourcePath = await this.specUriPath(resource, 'list'); + const res: RequestResult = await this.watcher.watch(resourcePath, options, callback, done); + return { + abort: () => res.abort(), + }; + } + /** Set default namespace from current context, if available. */ protected setDefaultNamespace(kc: KubeConfig): string { if (kc.currentContext) { From b71e20fbe60b32f1099b0321337294e0b0de75ed Mon Sep 17 00:00:00 2001 From: schrodit Date: Sat, 17 Aug 2024 15:26:21 +0200 Subject: [PATCH 2/3] add tests --- src/object.ts | 37 ++++- src/object_test.ts | 356 +++++++++++++++++++++++++++++++++++++-------- src/util.ts | 32 ++++ 3 files changed, 359 insertions(+), 66 deletions(-) diff --git a/src/object.ts b/src/object.ts index cb5e018da91..bd37428e6ca 100644 --- a/src/object.ts +++ b/src/object.ts @@ -546,7 +546,7 @@ export class KubernetesObjectApi extends ApisApi { * * @returns WatchResult object that can be used to abort the watch. */ - public async watch({ + public async watch({ resource, options = {}, callback, @@ -564,8 +564,39 @@ export class KubernetesObjectApi extends ApisApi { if (!this.watcher) { throw new Error('Watcher not initialized'); } - const resourcePath = await this.specUriPath(resource, 'list'); - const res: RequestResult = await this.watcher.watch(resourcePath, options, callback, done); + const resourcePath = new URL( + await this.specUriPath( + { + apiVersion: resource.apiVersion, + kind: resource.kind, + metadata: { + namespace: resource.namespace, + }, + }, + 'list', + ), + ).pathname; + const type = await this.getSerializationType(resource.apiVersion, resource.kind); + const cb: WatchCallback = (phase: KubernetesEventType, apiObj: T, watchObj?: WatchObject) => { + const obj = ObjectSerializer.deserialize(apiObj, type); + callback( + phase, + obj, + watchObj + ? { + ...watchObj, + object: obj, + } + : undefined, + ); + }; + const res: RequestResult = await this.watcher.watch( + resourcePath, + options, + // required to convert to less strict type. + cb as (phase: string, apiObj: any, watchObj?: any) => void, + done, + ); return { abort: () => res.abort(), }; diff --git a/src/object_test.ts b/src/object_test.ts index 0d812931962..8eb3fe43564 100644 --- a/src/object_test.ts +++ b/src/object_test.ts @@ -1,69 +1,17 @@ import { fail } from 'assert'; import { expect } from 'chai'; import nock = require('nock'); +import { PassThrough } from 'stream'; +import { IncomingMessage } from 'http'; import { V1APIResource, V1APIResourceList, V1Secret } from './api'; import { KubeConfig } from './config'; +import { Cluster, Context, User } from './config_types'; import { KubernetesObjectApi } from './object'; import { KubernetesObject } from './types'; +import { resolvablePromise } from './util'; -describe('KubernetesObject', () => { - const testConfigOptions = { - clusters: [{ name: 'dc', server: 'https://d.i.y' }], - users: [{ name: 'ian', password: 'mackaye' }], - contexts: [{ name: 'dischord', cluster: 'dc', user: 'ian' }], - currentContext: 'dischord', - }; - - describe('makeApiClient', () => { - it('should create the client', () => { - const kc = new KubeConfig(); - kc.loadFromOptions(testConfigOptions); - const c = KubernetesObjectApi.makeApiClient(kc); - expect(c).to.be.ok; - expect((c as any).defaultNamespace).to.equal('default'); - }); - - it('should set the default namespace from context', () => { - const kc = new KubeConfig(); - kc.loadFromOptions({ - clusters: [{ name: 'dc', server: 'https://d.i.y' }], - users: [{ name: 'ian', password: 'mackaye' }], - contexts: [{ name: 'dischord', cluster: 'dc', user: 'ian', namespace: 'straight-edge' }], - currentContext: 'dischord', - }); - const c = KubernetesObjectApi.makeApiClient(kc); - expect(c).to.be.ok; - expect((c as any).defaultNamespace).to.equal('straight-edge'); - }); - }); - - class KubernetesObjectApiTest extends KubernetesObjectApi { - public static makeApiClient(kc?: KubeConfig): KubernetesObjectApiTest { - if (!kc) { - kc = new KubeConfig(); - kc.loadFromOptions(testConfigOptions); - } - const client = kc.makeApiClient(KubernetesObjectApiTest); - client.setDefaultNamespace(kc); - return client; - } - public apiVersionResourceCache: Record = {}; - public async specUriPath(spec: KubernetesObject, method: any): Promise { - return super.specUriPath(spec, method); - } - public generateHeaders( - optionsHeaders: { [name: string]: string }, - action: string = 'GET', - ): { [name: string]: string } { - return super.generateHeaders(optionsHeaders, action); - } - public async resource(apiVersion: string, kind: string): Promise { - return super.resource(apiVersion, kind); - } - } - - const resourceBodies = { - core: `{ +const resourceBodies = { + core: `{ "groupVersion": "v1", "kind": "APIResourceList", "resources": [ @@ -254,7 +202,7 @@ describe('KubernetesObject', () => { ] }`, - apps: `{ + apps: `{ "apiVersion": "v1", "groupVersion": "apps/v1", "kind": "APIResourceList", @@ -327,7 +275,7 @@ describe('KubernetesObject', () => { } ] }`, - extensions: `{ + extensions: `{ "groupVersion": "extensions/v1beta1", "kind": "APIResourceList", "resources": [ @@ -412,7 +360,7 @@ describe('KubernetesObject', () => { } ] }`, - networking: `{ + networking: `{ "apiVersion": "v1", "groupVersion": "networking.k8s.io/v1", "kind": "APIResourceList", @@ -424,7 +372,7 @@ describe('KubernetesObject', () => { } ] }`, - rbac: `{ + rbac: `{ "apiVersion": "v1", "groupVersion": "rbac.authorization.k8s.io/v1", "kind": "APIResourceList", @@ -451,7 +399,7 @@ describe('KubernetesObject', () => { } ] }`, - storage: `{ + storage: `{ "apiVersion": "v1", "groupVersion": "storage.k8s.io/v1", "kind": "APIResourceList", @@ -473,8 +421,64 @@ describe('KubernetesObject', () => { } ] }`, +}; + +describe('KubernetesObject', () => { + const testConfigOptions = { + clusters: [{ name: 'dc', server: 'https://d.i.y' }], + users: [{ name: 'ian', password: 'mackaye' }], + contexts: [{ name: 'dischord', cluster: 'dc', user: 'ian' }], + currentContext: 'dischord', }; + describe('makeApiClient', () => { + it('should create the client', () => { + const kc = new KubeConfig(); + kc.loadFromOptions(testConfigOptions); + const c = KubernetesObjectApi.makeApiClient(kc); + expect(c).to.be.ok; + expect((c as any).defaultNamespace).to.equal('default'); + }); + + it('should set the default namespace from context', () => { + const kc = new KubeConfig(); + kc.loadFromOptions({ + clusters: [{ name: 'dc', server: 'https://d.i.y' }], + users: [{ name: 'ian', password: 'mackaye' }], + contexts: [{ name: 'dischord', cluster: 'dc', user: 'ian', namespace: 'straight-edge' }], + currentContext: 'dischord', + }); + const c = KubernetesObjectApi.makeApiClient(kc); + expect(c).to.be.ok; + expect((c as any).defaultNamespace).to.equal('straight-edge'); + }); + }); + + class KubernetesObjectApiTest extends KubernetesObjectApi { + public static makeApiClient(kc?: KubeConfig): KubernetesObjectApiTest { + if (!kc) { + kc = new KubeConfig(); + kc.loadFromOptions(testConfigOptions); + } + const client = kc.makeApiClient(KubernetesObjectApiTest); + client.setDefaultNamespace(kc); + return client; + } + public apiVersionResourceCache: Record = {}; + public async specUriPath(spec: KubernetesObject, method: any): Promise { + return super.specUriPath(spec, method); + } + public generateHeaders( + optionsHeaders: { [name: string]: string }, + action: string = 'GET', + ): { [name: string]: string } { + return super.generateHeaders(optionsHeaders, action); + } + public async resource(apiVersion: string, kind: string): Promise { + return super.resource(apiVersion, kind); + } + } + describe('specUriPath', () => { it('should return a namespaced path', async () => { const c = KubernetesObjectApiTest.makeApiClient(); @@ -2114,3 +2118,229 @@ describe('KubernetesObject', () => { }); }); }); + +describe('watch', () => { + const server = 'https://d.i.y'; + const fakeConfig: { + clusters: Cluster[]; + contexts: Context[]; + users: User[]; + } = { + clusters: [ + { + name: 'cluster', + server, + } as Cluster, + ], + contexts: [ + { + cluster: 'cluster', + user: 'user', + } as Context, + ], + users: [ + { + name: 'user', + } as User, + ], + }; + + const testObject = { + apiVersion: 'v1', + kind: 'Secret', + metadata: { + name: 'k8s-js-client-test', + namespace: 'default', + creationTimestamp: '2022-01-01T00:00:00.000Z', + uid: undefined, + annotations: undefined, + labels: undefined, + finalizers: undefined, + generateName: undefined, + selfLink: undefined, + resourceVersion: undefined, + generation: undefined, + ownerReferences: undefined, + deletionTimestamp: undefined, + deletionGracePeriodSeconds: undefined, + managedFields: undefined, + }, + data: { + key: 'value', + }, + type: undefined, + immutable: undefined, + stringData: undefined, + }; + + it('watch with namespace -> receive events', async () => { + const kc = new KubeConfig(); + Object.assign(kc, fakeConfig); + const client = KubernetesObjectApi.makeApiClient(kc); + + const stream = new PassThrough(); + let response: IncomingMessage | undefined; + + const s = nock('https://d.i.y') + .get('/api/v1') + .reply(200, resourceBodies.core) + .get('/api/v1/namespaces/default/secrets') + .query({ + watch: 'true', + }) + .reply(200, function (): PassThrough { + this.req.on('response', (r) => { + response = r; + }); + stream.push( + JSON.stringify({ + type: 'ADDED', + object: testObject, + }) + '\n', + ); + return stream; + }); + + const receivedObjects: V1Secret[] = []; + const done = resolvablePromise(); + const { abort } = await client.watch({ + resource: { + apiVersion: 'v1', + kind: 'Secret', + namespace: 'default', + }, + callback: (phase, obj) => { + receivedObjects.push(obj); + if (receivedObjects.length === 1) { + done.resolve(); + } + }, + done: () => {}, + }); + + await done; + abort(); + + expect(receivedObjects).to.have.length(1); + expect(receivedObjects[0]).to.be.instanceof(V1Secret); + + s.done(); + stream.destroy(); + }); + + it('watch all namespaces -> receive events', async () => { + const kc = new KubeConfig(); + Object.assign(kc, fakeConfig); + const client = KubernetesObjectApi.makeApiClient(kc); + + const stream = new PassThrough(); + let response: IncomingMessage | undefined; + + const s = nock('https://d.i.y') + .get('/api/v1') + .reply(200, resourceBodies.core) + .get('/api/v1/secrets') + .query({ + watch: 'true', + }) + .reply(200, function (): PassThrough { + this.req.on('response', (r) => { + response = r; + }); + stream.push( + JSON.stringify({ + type: 'ADDED', + object: testObject, + }) + '\n', + ); + return stream; + }); + + const receivedObjects: V1Secret[] = []; + const done = resolvablePromise(); + const { abort } = await client.watch({ + resource: { + apiVersion: 'v1', + kind: 'Secret', + }, + callback: (phase, obj) => { + receivedObjects.push(obj); + if (receivedObjects.length === 1) { + done.resolve(); + } + }, + done: () => {}, + }); + + await done; + abort(); + + expect(receivedObjects).to.have.length(1); + expect(receivedObjects[0]).to.be.instanceof(V1Secret); + + s.done(); + stream.destroy(); + }); + + it('receive multiple events', async () => { + const kc = new KubeConfig(); + Object.assign(kc, fakeConfig); + const client = KubernetesObjectApi.makeApiClient(kc); + + const stream = new PassThrough(); + let response: IncomingMessage | undefined; + + const s = nock('https://d.i.y') + .get('/api/v1') + .reply(200, resourceBodies.core) + .get('/api/v1/namespaces/default/secrets') + .query({ + watch: 'true', + }) + .reply(200, function (): PassThrough { + this.req.on('response', (r) => { + response = r; + }); + stream.push( + JSON.stringify({ + type: 'ADDED', + object: testObject, + }) + '\n', + ); + stream.push( + JSON.stringify({ + type: 'MODIFIED', + object: testObject, + }) + '\n', + ); + return stream; + }); + + const receivedObjects: V1Secret[] = []; + const receivedPhases: string[] = []; + const done = resolvablePromise(); + const { abort } = await client.watch({ + resource: { + apiVersion: 'v1', + kind: 'Secret', + namespace: 'default', + }, + callback: (phase, obj) => { + receivedPhases.push(phase); + receivedObjects.push(obj); + if (receivedObjects.length === 2) { + done.resolve(); + } + }, + done: () => {}, + }); + + await done; + abort(); + + expect(receivedPhases).to.deep.equal(['ADDED', 'MODIFIED']); + + s.done(); + stream.destroy(); + }); +}); diff --git a/src/util.ts b/src/util.ts index 4c1eeac98fb..7cae48d39e8 100644 --- a/src/util.ts +++ b/src/util.ts @@ -162,3 +162,35 @@ export async function generateTmpFileName(): Promise { throw new Error('Cannot generate tmp file name'); } + +/** + * If you don't want to or can't await a promise right await and need to store + * it in a variable to await it later, wrap it with this function. + * Otherwise, if the promise rejects before you await it, it will cause an + * unhandled promise rejection which can cause process termination (in node). + */ +export const awaitLater = >(p: P): P => { + p.catch(() => {}); + return p; +}; + +export interface ResolvablePromise extends Promise { + resolve: (value: T) => void; + reject: (error: Error) => void; +} + +export const resolvablePromise = (): ResolvablePromise => { + let resolve: (value: T) => void; + let reject: (error: Error) => void; + + const promise = awaitLater( + new Promise((res, rej) => { + resolve = res; + reject = rej; + }), + ) as ResolvablePromise; + + promise.resolve = resolve!; + promise.reject = reject!; + return promise; +}; From d5b673ca02bbaa81ae114bd23e6ead2d62b371db Mon Sep 17 00:00:00 2001 From: Brendan Burns <5751682+brendandburns@users.noreply.github.com> Date: Fri, 23 Aug 2024 13:25:58 -0700 Subject: [PATCH 3/3] Update src/util.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Max StrĂ¼bing --- src/util.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util.ts b/src/util.ts index 7cae48d39e8..024b4416f98 100644 --- a/src/util.ts +++ b/src/util.ts @@ -170,7 +170,7 @@ export async function generateTmpFileName(): Promise { * unhandled promise rejection which can cause process termination (in node). */ export const awaitLater = >(p: P): P => { - p.catch(() => {}); + p.catch(() => {}); // tslint:disable-line:no-empty return p; };