Skip to content

Commit 1468cbd

Browse files
committed
add generic and typed watch to KubernetesObjectApi
1 parent aed53fe commit 1468cbd

File tree

1 file changed

+116
-0
lines changed

1 file changed

+116
-0
lines changed

src/object.ts

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { ApisApi, HttpError, V1APIResource, V1APIResourceList, V1DeleteOptions,
44
import { KubeConfig } from './config';
55
import ObjectSerializer from './serializer';
66
import { KubernetesListObject, KubernetesObject } from './types';
7+
import { RequestResult, Watch } from './watch';
78

89
/** Union type of body types returned by KubernetesObjectApi. */
910
type KubernetesObjectResponseBody =
@@ -44,6 +45,60 @@ enum KubernetesPatchStrategies {
4445
StrategicMergePatch = 'application/strategic-merge-patch+json',
4546
}
4647

48+
/**
49+
* Describes the type of an watch event.
50+
* Object is:
51+
* - If Type is Added or Modified: the new state of the object.
52+
* - If Type is Deleted: the state of the object immediately before deletion.
53+
* - If Type is Bookmark: the object (instance of a type being watched) where
54+
* only ResourceVersion field is set. On successful restart of watch from a
55+
* bookmark resourceVersion, client is guaranteed to not get repeat event
56+
* nor miss any events.
57+
* - If Type is Error: *api.Status is recommended; other types may make sense
58+
* depending on context.
59+
*/
60+
export enum KubernetesEventType {
61+
ADDED = 'ADDED',
62+
MODIFIED = 'MODIFIED',
63+
DELETED = 'DELETED',
64+
BOOKMARK = 'BOOKMARK',
65+
ERROR = 'ERROR',
66+
}
67+
68+
export type WatchObject<T extends KubernetesObject | KubernetesObject> = {
69+
type: KubernetesEventType;
70+
object: T;
71+
};
72+
73+
export type WatchCallback<T extends KubernetesObject | KubernetesObject> = (
74+
phase: KubernetesEventType,
75+
apiObj: T,
76+
watchObj?: WatchObject<T>,
77+
) => void;
78+
79+
export type WatchOptions = {
80+
/**
81+
* To mitigate the impact of short history window,
82+
* the Kubernetes API provides a watch event named BOOKMARK.
83+
* It is a special kind of event to mark that all changes
84+
* up to a given resourceVersion the client is requesting
85+
* have already been sent.
86+
*
87+
* See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
88+
*/
89+
allowWatchBookmarks?: boolean;
90+
/**
91+
* Start watch at the given resource version.
92+
* Starting at a specific resource version means that only events
93+
* starting from that versions are included in the watch stream.
94+
*/
95+
resourceVersion?: string;
96+
};
97+
98+
export type WatchResult = {
99+
abort: () => void;
100+
};
101+
47102
/**
48103
* Dynamically construct Kubernetes API request URIs so client does not have to know what type of object it is acting
49104
* on.
@@ -60,6 +115,7 @@ export class KubernetesObjectApi extends ApisApi {
60115
public static makeApiClient(kc: KubeConfig): KubernetesObjectApi {
61116
const client = kc.makeApiClient(KubernetesObjectApi);
62117
client.setDefaultNamespace(kc);
118+
client.watcher = new Watch(kc);
63119
return client;
64120
}
65121

@@ -69,6 +125,8 @@ export class KubernetesObjectApi extends ApisApi {
69125
/** Cache resource API response. */
70126
protected apiVersionResourceCache: Record<string, V1APIResourceList> = {};
71127

128+
protected watcher?: Watch;
129+
72130
/**
73131
* Create any Kubernetes resource.
74132
* @param spec Kubernetes resource spec.
@@ -474,6 +532,64 @@ export class KubernetesObjectApi extends ApisApi {
474532
return this.requestPromise(localVarRequestOptions);
475533
}
476534

535+
/**
536+
* Watches the given resources and calls provided callback with the parsed json object
537+
* upon event received over the watcher connection.
538+
*
539+
* @param resource defines the resources to watch. Namespace is optional.
540+
* Undefined namespace means to watch all namespaces.
541+
* @param options Optional options that are passed to the watch request.
542+
* @param callback callback function that is called with the parsed json object upon event received.
543+
* @param done callback is called either when connection is closed or when there
544+
* is an error. In either case, watcher takes care of properly closing the
545+
* underlaying connection so that it doesn't leak any resources.
546+
*
547+
* @returns WatchResult object that can be used to abort the watch.
548+
*/
549+
public async watch<T>({
550+
resource,
551+
options = {},
552+
callback,
553+
done,
554+
}: {
555+
resource: {
556+
apiVersion: string;
557+
kind: string;
558+
namespace?: string;
559+
};
560+
options?: WatchOptions;
561+
callback: WatchCallback<T>;
562+
done: (err: unknown) => void;
563+
}): Promise<WatchResult> {
564+
if (!this.watcher) {
565+
throw new Error('Watcher not initialized');
566+
}
567+
const resourcePath = await this.specUriPath(resource, 'list');
568+
const res: RequestResult = await this.watcher.watch(resourcePath, options, callback, done);
569+
return {
570+
abort: () => res.abort(),
571+
};
572+
}
573+
574+
/**
575+
* Returns the api path for a kubernetes resource as it is needed by the watch method.
576+
*/
577+
public async getApiResourcePath(apiVersion: string, kind: string, namespace?: string): Promise<string> {
578+
const url = new URL(
579+
await this.client.specUriPath(
580+
{
581+
apiVersion,
582+
kind,
583+
metadata: {
584+
namespace,
585+
},
586+
},
587+
'list',
588+
),
589+
);
590+
return url.pathname;
591+
}
592+
477593
/** Set default namespace from current context, if available. */
478594
protected setDefaultNamespace(kc: KubeConfig): string {
479595
if (kc.currentContext) {

0 commit comments

Comments
 (0)