Skip to content

Commit 38556ae

Browse files
authored
Merge pull request #1995 from kubernetes-client/watch_performance
Improve watch performance.
2 parents 03f38db + 0567507 commit 38556ae

File tree

2 files changed

+211
-111
lines changed

2 files changed

+211
-111
lines changed

src/cache.ts

Lines changed: 98 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@ export interface ObjectCache<T> {
2020
list(namespace?: string): ReadonlyArray<T>;
2121
}
2222

23+
// exported for testing
24+
export type CacheMap<T extends KubernetesObject> = Map<string, Map<string, T>>;
25+
2326
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
24-
private objects: T[] = [];
27+
private objects: CacheMap<T> = new Map();
2528
private resourceVersion: string;
2629
private readonly indexCache: { [key: string]: T[] } = {};
2730
private readonly callbackCache: { [key: string]: (ObjectCallback<T> | ErrorCallback)[] } = {};
@@ -93,16 +96,26 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
9396
}
9497

9598
public get(name: string, namespace?: string): T | undefined {
96-
return this.objects.find((obj: T): boolean => {
97-
return obj.metadata!.name === name && (!namespace || obj.metadata!.namespace === namespace);
98-
});
99+
const nsObjects = this.objects.get(namespace || '');
100+
if (nsObjects) {
101+
return nsObjects.get(name);
102+
}
103+
return undefined;
99104
}
100105

101106
public list(namespace?: string | undefined): ReadonlyArray<T> {
102107
if (!namespace) {
103-
return this.objects;
108+
const allObjects: T[] = [];
109+
for (const nsObjects of this.objects.values()) {
110+
allObjects.push(...nsObjects.values());
111+
}
112+
return allObjects;
113+
}
114+
const namespaceObjects = this.objects.get(namespace || '');
115+
if (!namespaceObjects) {
116+
return [];
104117
}
105-
return this.indexCache[namespace] as ReadonlyArray<T>;
118+
return Array.from(namespaceObjects.values());
106119
}
107120

108121
public latestResourceVersion(): string {
@@ -116,7 +129,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
116129
}
117130
}
118131

119-
private async doneHandler(err: any): Promise<any> {
132+
private async doneHandler(err: any): Promise<void> {
120133
this._stop();
121134
if (err && err.statusCode === 410) {
122135
this.resourceVersion = '';
@@ -133,16 +146,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
133146
const promise = this.listFn();
134147
const list = await promise;
135148
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
136-
Object.keys(this.indexCache).forEach((key) => {
137-
const updateObjects = deleteItems(this.indexCache[key], list.items);
138-
if (updateObjects.length !== 0) {
139-
this.indexCache[key] = updateObjects;
140-
} else {
141-
delete this.indexCache[key];
142-
}
143-
});
144149
this.addOrUpdateItems(list.items);
145-
this.resourceVersion = list.metadata!.resourceVersion!;
150+
this.resourceVersion = list.metadata!.resourceVersion || '';
146151
}
147152
const queryParams = {
148153
resourceVersion: this.resourceVersion,
@@ -169,21 +174,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
169174
this.callbackCache[ADD].slice(),
170175
this.callbackCache[UPDATE].slice(),
171176
);
172-
if (obj.metadata!.namespace) {
173-
this.indexObj(obj);
174-
}
175177
});
176178
}
177179

178-
private indexObj(obj: T): void {
179-
let namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
180-
if (!namespaceList) {
181-
namespaceList = [];
182-
this.indexCache[obj.metadata!.namespace!] = namespaceList;
183-
}
184-
addOrUpdateObject(namespaceList, obj);
185-
}
186-
187180
private watchHandler(phase: string, obj: T, watchObj?: any): void {
188181
switch (phase) {
189182
case 'ERROR':
@@ -199,18 +192,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
199192
this.callbackCache[ADD].slice(),
200193
this.callbackCache[UPDATE].slice(),
201194
);
202-
if (obj.metadata!.namespace) {
203-
this.indexObj(obj);
204-
}
205195
break;
206196
case 'DELETED':
207197
deleteObject(this.objects, obj, this.callbackCache[DELETE].slice());
208-
if (obj.metadata!.namespace) {
209-
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
210-
if (namespaceList) {
211-
deleteObject(namespaceList, obj);
212-
}
213-
}
214198
break;
215199
case 'BOOKMARK':
216200
// nothing to do, here for documentation, mostly.
@@ -222,75 +206,113 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
222206
}
223207
}
224208

209+
// exported for testing
210+
export function cacheMapFromList<T extends KubernetesObject>(newObjects: T[]): CacheMap<T> {
211+
const objects: CacheMap<T> = new Map();
212+
// build up the new list
213+
for (const obj of newObjects) {
214+
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
215+
if (!namespaceObjects) {
216+
namespaceObjects = new Map();
217+
objects.set(obj.metadata!.namespace || '', namespaceObjects);
218+
}
219+
220+
const name = obj.metadata!.name || '';
221+
namespaceObjects.set(name, obj);
222+
}
223+
return objects;
224+
}
225+
225226
// external for testing
226227
export function deleteItems<T extends KubernetesObject>(
227-
oldObjects: T[],
228+
oldObjects: CacheMap<T>,
228229
newObjects: T[],
229230
deleteCallback?: ObjectCallback<T>[],
230-
): T[] {
231-
return oldObjects.filter((obj: T) => {
232-
if (findKubernetesObject(newObjects, obj) === -1) {
233-
if (deleteCallback) {
234-
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
231+
): CacheMap<T> {
232+
const newObjectsMap = cacheMapFromList(newObjects);
233+
234+
for (const [namespace, oldNamespaceObjects] of oldObjects.entries()) {
235+
const newNamespaceObjects = newObjectsMap.get(namespace);
236+
if (newNamespaceObjects) {
237+
for (const [name, oldObj] of oldNamespaceObjects.entries()) {
238+
if (!newNamespaceObjects.has(name)) {
239+
oldNamespaceObjects.delete(name);
240+
if (deleteCallback) {
241+
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(oldObj));
242+
}
243+
}
235244
}
236-
return false;
245+
} else {
246+
oldObjects.delete(namespace);
247+
oldNamespaceObjects.forEach((obj: T) => {
248+
if (deleteCallback) {
249+
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
250+
}
251+
});
237252
}
238-
return true;
239-
});
253+
}
254+
255+
return oldObjects;
240256
}
241257

242258
// Only public for testing.
243259
export function addOrUpdateObject<T extends KubernetesObject>(
244-
objects: T[],
260+
objects: CacheMap<T>,
245261
obj: T,
246-
addCallback?: ObjectCallback<T>[],
247-
updateCallback?: ObjectCallback<T>[],
262+
addCallbacks?: ObjectCallback<T>[],
263+
updateCallbacks?: ObjectCallback<T>[],
248264
): void {
249-
const ix = findKubernetesObject(objects, obj);
250-
if (ix === -1) {
251-
objects.push(obj);
252-
if (addCallback) {
253-
addCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
265+
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
266+
if (!namespaceObjects) {
267+
namespaceObjects = new Map();
268+
objects.set(obj.metadata!.namespace || '', namespaceObjects);
269+
}
270+
271+
const name = obj.metadata!.name || '';
272+
const found = namespaceObjects.get(name);
273+
if (!found) {
274+
namespaceObjects.set(name, obj);
275+
if (addCallbacks) {
276+
addCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
254277
}
255278
} else {
256-
if (!isSameVersion(objects[ix], obj)) {
257-
objects[ix] = obj;
258-
if (updateCallback) {
259-
updateCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
279+
if (!isSameVersion(found, obj)) {
280+
namespaceObjects.set(name, obj);
281+
if (updateCallbacks) {
282+
updateCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
260283
}
261284
}
262285
}
263286
}
264287

265-
function isSameObject<T extends KubernetesObject>(o1: T, o2: T): boolean {
266-
return o1.metadata!.name === o2.metadata!.name && o1.metadata!.namespace === o2.metadata!.namespace;
267-
}
268-
269-
function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
288+
function isSameVersion<T extends KubernetesObject>(o1: any, o2: T): boolean {
270289
return (
271290
o1.metadata!.resourceVersion !== undefined &&
272291
o1.metadata!.resourceVersion !== null &&
273292
o1.metadata!.resourceVersion === o2.metadata!.resourceVersion
274293
);
275294
}
276295

277-
function findKubernetesObject<T extends KubernetesObject>(objects: T[], obj: T): number {
278-
return objects.findIndex((elt: T) => {
279-
return isSameObject(elt, obj);
280-
});
281-
}
282-
283296
// Public for testing.
284297
export function deleteObject<T extends KubernetesObject>(
285-
objects: T[],
298+
objects: CacheMap<T>,
286299
obj: T,
287-
deleteCallback?: ObjectCallback<T>[],
300+
deleteCallbacks?: ObjectCallback<T>[],
288301
): void {
289-
const ix = findKubernetesObject(objects, obj);
290-
if (ix !== -1) {
291-
objects.splice(ix, 1);
292-
if (deleteCallback) {
293-
deleteCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
302+
const namespace = obj.metadata!.namespace || '';
303+
const name = obj.metadata!.name || '';
304+
305+
const namespaceObjects = objects.get(namespace);
306+
if (!namespaceObjects) {
307+
return;
308+
}
309+
const deleted = namespaceObjects.delete(name);
310+
if (deleted) {
311+
if (deleteCallbacks) {
312+
deleteCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
313+
}
314+
if (namespaceObjects.size === 0) {
315+
objects.delete(namespace);
294316
}
295317
}
296318
}

0 commit comments

Comments
 (0)