Skip to content

Commit e3117a4

Browse files
authored
Merge pull request #1831 from schrodit/generic-watch
add generic and typed watch to KubernetesObjectApi
2 parents 9f646f2 + d5b673c commit e3117a4

File tree

3 files changed

+453
-63
lines changed

3 files changed

+453
-63
lines changed

src/object.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { ApisApi, HttpError, V1APIResource, V1APIResourceList, V1DeleteOptions,
44
import { KubeConfig } from './config';
55
import ObjectSerializer from './serializer';
66
import { KubernetesListObject, KubernetesObject } from './types';
7+
import { RequestResult, Watch } from './watch';
78

89
/** Union type of body types returned by KubernetesObjectApi. */
910
type KubernetesObjectResponseBody =
@@ -44,6 +45,60 @@ enum KubernetesPatchStrategies {
4445
StrategicMergePatch = 'application/strategic-merge-patch+json',
4546
}
4647

48+
/**
49+
* Describes the type of an watch event.
50+
* Object is:
51+
* - If Type is Added or Modified: the new state of the object.
52+
* - If Type is Deleted: the state of the object immediately before deletion.
53+
* - If Type is Bookmark: the object (instance of a type being watched) where
54+
* only ResourceVersion field is set. On successful restart of watch from a
55+
* bookmark resourceVersion, client is guaranteed to not get repeat event
56+
* nor miss any events.
57+
* - If Type is Error: *api.Status is recommended; other types may make sense
58+
* depending on context.
59+
*/
60+
export enum KubernetesEventType {
61+
ADDED = 'ADDED',
62+
MODIFIED = 'MODIFIED',
63+
DELETED = 'DELETED',
64+
BOOKMARK = 'BOOKMARK',
65+
ERROR = 'ERROR',
66+
}
67+
68+
export type WatchObject<T extends KubernetesObject | KubernetesObject> = {
69+
type: KubernetesEventType;
70+
object: T;
71+
};
72+
73+
export type WatchCallback<T extends KubernetesObject | KubernetesObject> = (
74+
phase: KubernetesEventType,
75+
apiObj: T,
76+
watchObj?: WatchObject<T>,
77+
) => void;
78+
79+
export type WatchOptions = {
80+
/**
81+
* To mitigate the impact of short history window,
82+
* the Kubernetes API provides a watch event named BOOKMARK.
83+
* It is a special kind of event to mark that all changes
84+
* up to a given resourceVersion the client is requesting
85+
* have already been sent.
86+
*
87+
* See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
88+
*/
89+
allowWatchBookmarks?: boolean;
90+
/**
91+
* Start watch at the given resource version.
92+
* Starting at a specific resource version means that only events
93+
* starting from that versions are included in the watch stream.
94+
*/
95+
resourceVersion?: string;
96+
};
97+
98+
export type WatchResult = {
99+
abort: () => void;
100+
};
101+
47102
/**
48103
* Dynamically construct Kubernetes API request URIs so client does not have to know what type of object it is acting
49104
* on.
@@ -60,6 +115,7 @@ export class KubernetesObjectApi extends ApisApi {
60115
public static makeApiClient(kc: KubeConfig): KubernetesObjectApi {
61116
const client = kc.makeApiClient(KubernetesObjectApi);
62117
client.setDefaultNamespace(kc);
118+
client.watcher = new Watch(kc);
63119
return client;
64120
}
65121

@@ -69,6 +125,8 @@ export class KubernetesObjectApi extends ApisApi {
69125
/** Cache resource API response. */
70126
protected apiVersionResourceCache: Record<string, V1APIResourceList> = {};
71127

128+
protected watcher?: Watch;
129+
72130
/**
73131
* Create any Kubernetes resource.
74132
* @param spec Kubernetes resource spec.
@@ -474,6 +532,76 @@ export class KubernetesObjectApi extends ApisApi {
474532
return this.requestPromise(localVarRequestOptions);
475533
}
476534

535+
/**
536+
* Watches the given resources and calls provided callback with the parsed json object
537+
* upon event received over the watcher connection.
538+
*
539+
* @param resource defines the resources to watch. Namespace is optional.
540+
* Undefined namespace means to watch all namespaces.
541+
* @param options Optional options that are passed to the watch request.
542+
* @param callback callback function that is called with the parsed json object upon event received.
543+
* @param done callback is called either when connection is closed or when there
544+
* is an error. In either case, watcher takes care of properly closing the
545+
* underlaying connection so that it doesn't leak any resources.
546+
*
547+
* @returns WatchResult object that can be used to abort the watch.
548+
*/
549+
public async watch<T extends KubernetesObject | KubernetesObject>({
550+
resource,
551+
options = {},
552+
callback,
553+
done,
554+
}: {
555+
resource: {
556+
apiVersion: string;
557+
kind: string;
558+
namespace?: string;
559+
};
560+
options?: WatchOptions;
561+
callback: WatchCallback<T>;
562+
done: (err: unknown) => void;
563+
}): Promise<WatchResult> {
564+
if (!this.watcher) {
565+
throw new Error('Watcher not initialized');
566+
}
567+
const resourcePath = new URL(
568+
await this.specUriPath(
569+
{
570+
apiVersion: resource.apiVersion,
571+
kind: resource.kind,
572+
metadata: {
573+
namespace: resource.namespace,
574+
},
575+
},
576+
'list',
577+
),
578+
).pathname;
579+
const type = await this.getSerializationType(resource.apiVersion, resource.kind);
580+
const cb: WatchCallback<T> = (phase: KubernetesEventType, apiObj: T, watchObj?: WatchObject<T>) => {
581+
const obj = ObjectSerializer.deserialize(apiObj, type);
582+
callback(
583+
phase,
584+
obj,
585+
watchObj
586+
? {
587+
...watchObj,
588+
object: obj,
589+
}
590+
: undefined,
591+
);
592+
};
593+
const res: RequestResult = await this.watcher.watch(
594+
resourcePath,
595+
options,
596+
// required to convert to less strict type.
597+
cb as (phase: string, apiObj: any, watchObj?: any) => void,
598+
done,
599+
);
600+
return {
601+
abort: () => res.abort(),
602+
};
603+
}
604+
477605
/** Set default namespace from current context, if available. */
478606
protected setDefaultNamespace(kc: KubeConfig): string {
479607
if (kc.currentContext) {

0 commit comments

Comments
 (0)