Skip to content

Commit a5264eb

Browse files
authored
Merge pull request #780 from bbatha/fix-cache-performance
fix(cache): update cache with O(1) data structures
2 parents ea5041d + a589f36 commit a5264eb

File tree

2 files changed

+252
-112
lines changed

2 files changed

+252
-112
lines changed

src/cache.ts

Lines changed: 100 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@ export interface ObjectCache<T> {
2020
list(namespace?: string): ReadonlyArray<T>;
2121
}
2222

23+
// exported for testing
24+
export type CacheMap<T extends KubernetesObject> = Map<string, Map<string, T>>;
25+
2326
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
24-
private objects: T[] = [];
27+
private objects: CacheMap<T> = new Map();
2528
private resourceVersion: string;
26-
private readonly indexCache: { [key: string]: T[] } = {};
27-
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T> | ErrorCallback> } = {};
29+
private readonly callbackCache: {
30+
[key: string]: Array<ObjectCallback<T> | ErrorCallback>;
31+
} = {};
2832
private request: RequestResult | undefined;
2933
private stopped: boolean = false;
3034

@@ -93,18 +97,26 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
9397
}
9498

9599
public get(name: string, namespace?: string): T | undefined {
96-
return this.objects.find(
97-
(obj: T): boolean => {
98-
return obj.metadata!.name === name && (!namespace || obj.metadata!.namespace === namespace);
99-
},
100-
);
100+
const nsObjects = this.objects.get(namespace || '');
101+
if (nsObjects) {
102+
return nsObjects.get(name);
103+
}
104+
return undefined;
101105
}
102106

103107
public list(namespace?: string | undefined): ReadonlyArray<T> {
104108
if (!namespace) {
105-
return this.objects;
109+
const allObjects: T[] = [];
110+
for (const nsObjects of this.objects.values()) {
111+
allObjects.push(...nsObjects.values());
112+
}
113+
return allObjects;
114+
}
115+
const namespaceObjects = this.objects.get(namespace || '');
116+
if (!namespaceObjects) {
117+
return [];
106118
}
107-
return this.indexCache[namespace] as ReadonlyArray<T>;
119+
return Array.from(namespaceObjects.values());
108120
}
109121

110122
public latestResourceVersion(): string {
@@ -118,7 +130,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
118130
}
119131
}
120132

121-
private async doneHandler(err: any): Promise<any> {
133+
private async doneHandler(err: unknown): Promise<void> {
122134
this._stop();
123135
if (
124136
err &&
@@ -139,16 +151,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
139151
const result = await promise;
140152
const list = result.body;
141153
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
142-
Object.keys(this.indexCache).forEach((key) => {
143-
const updateObjects = deleteItems(this.indexCache[key], list.items);
144-
if (updateObjects.length !== 0) {
145-
this.indexCache[key] = updateObjects;
146-
} else {
147-
delete this.indexCache[key];
148-
}
149-
});
150154
this.addOrUpdateItems(list.items);
151-
this.resourceVersion = list.metadata!.resourceVersion!;
155+
this.resourceVersion = list.metadata!.resourceVersion || '';
152156
}
153157
const queryParams = {
154158
resourceVersion: this.resourceVersion,
@@ -175,21 +179,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
175179
this.callbackCache[ADD].slice(),
176180
this.callbackCache[UPDATE].slice(),
177181
);
178-
if (obj.metadata!.namespace) {
179-
this.indexObj(obj);
180-
}
181182
});
182183
}
183184

184-
private indexObj(obj: T): void {
185-
let namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
186-
if (!namespaceList) {
187-
namespaceList = [];
188-
this.indexCache[obj.metadata!.namespace!] = namespaceList;
189-
}
190-
addOrUpdateObject(namespaceList, obj);
191-
}
192-
193185
private async watchHandler(
194186
phase: string,
195187
obj: T,
@@ -204,18 +196,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
204196
this.callbackCache[ADD].slice(),
205197
this.callbackCache[UPDATE].slice(),
206198
);
207-
if (obj.metadata!.namespace) {
208-
this.indexObj(obj);
209-
}
210199
break;
211200
case 'DELETED':
212201
deleteObject(this.objects, obj, this.callbackCache[DELETE].slice());
213-
if (obj.metadata!.namespace) {
214-
const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[];
215-
if (namespaceList) {
216-
deleteObject(namespaceList, obj);
217-
}
218-
}
219202
break;
220203
case 'BOOKMARK':
221204
// nothing to do, here for documentation, mostly.
@@ -228,50 +211,85 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
228211
}
229212
}
230213

214+
// exported for testing
215+
export function cacheMapFromList<T extends KubernetesObject>(newObjects: T[]): CacheMap<T> {
216+
const objects: CacheMap<T> = new Map();
217+
// build up the new list
218+
for (const obj of newObjects) {
219+
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
220+
if (!namespaceObjects) {
221+
namespaceObjects = new Map();
222+
objects.set(obj.metadata!.namespace || '', namespaceObjects);
223+
}
224+
225+
const name = obj.metadata!.name || '';
226+
namespaceObjects.set(name, obj);
227+
}
228+
return objects;
229+
}
230+
231231
// external for testing
232232
export function deleteItems<T extends KubernetesObject>(
233-
oldObjects: T[],
233+
oldObjects: CacheMap<T>,
234234
newObjects: T[],
235235
deleteCallback?: Array<ObjectCallback<T>>,
236-
): T[] {
237-
return oldObjects.filter((obj: T) => {
238-
if (findKubernetesObject(newObjects, obj) === -1) {
239-
if (deleteCallback) {
240-
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
236+
): CacheMap<T> {
237+
const newObjectsMap = cacheMapFromList(newObjects);
238+
239+
for (const [namespace, oldNamespaceObjects] of oldObjects.entries()) {
240+
const newNamespaceObjects = newObjectsMap.get(namespace);
241+
if (newNamespaceObjects) {
242+
for (const [name, oldObj] of oldNamespaceObjects.entries()) {
243+
if (!newNamespaceObjects.has(name)) {
244+
oldNamespaceObjects.delete(name);
245+
if (deleteCallback) {
246+
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(oldObj));
247+
}
248+
}
241249
}
242-
return false;
250+
} else {
251+
oldObjects.delete(namespace);
252+
oldNamespaceObjects.forEach((obj: T) => {
253+
if (deleteCallback) {
254+
deleteCallback.forEach((fn: ObjectCallback<T>) => fn(obj));
255+
}
256+
});
243257
}
244-
return true;
245-
});
258+
}
259+
260+
return oldObjects;
246261
}
247262

248263
// Only public for testing.
249264
export function addOrUpdateObject<T extends KubernetesObject>(
250-
objects: T[],
265+
objects: CacheMap<T>,
251266
obj: T,
252-
addCallback?: Array<ObjectCallback<T>>,
253-
updateCallback?: Array<ObjectCallback<T>>,
267+
addCallbacks?: Array<ObjectCallback<T>>,
268+
updateCallbacks?: Array<ObjectCallback<T>>,
254269
): void {
255-
const ix = findKubernetesObject(objects, obj);
256-
if (ix === -1) {
257-
objects.push(obj);
258-
if (addCallback) {
259-
addCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
270+
let namespaceObjects = objects.get(obj.metadata!.namespace || '');
271+
if (!namespaceObjects) {
272+
namespaceObjects = new Map();
273+
objects.set(obj.metadata!.namespace || '', namespaceObjects);
274+
}
275+
276+
const name = obj.metadata!.name || '';
277+
const found = namespaceObjects.get(name);
278+
if (!found) {
279+
namespaceObjects.set(name, obj);
280+
if (addCallbacks) {
281+
addCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
260282
}
261283
} else {
262-
if (!isSameVersion(objects[ix], obj)) {
263-
objects[ix] = obj;
264-
if (updateCallback) {
265-
updateCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
284+
if (!isSameVersion(found, obj)) {
285+
namespaceObjects.set(name, obj);
286+
if (updateCallbacks) {
287+
updateCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
266288
}
267289
}
268290
}
269291
}
270292

271-
function isSameObject<T extends KubernetesObject>(o1: T, o2: T): boolean {
272-
return o1.metadata!.name === o2.metadata!.name && o1.metadata!.namespace === o2.metadata!.namespace;
273-
}
274-
275293
function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
276294
return (
277295
o1.metadata!.resourceVersion !== undefined &&
@@ -280,23 +298,26 @@ function isSameVersion<T extends KubernetesObject>(o1: T, o2: T): boolean {
280298
);
281299
}
282300

283-
function findKubernetesObject<T extends KubernetesObject>(objects: T[], obj: T): number {
284-
return objects.findIndex((elt: T) => {
285-
return isSameObject(elt, obj);
286-
});
287-
}
288-
289301
// Public for testing.
290302
export function deleteObject<T extends KubernetesObject>(
291-
objects: T[],
303+
objects: CacheMap<T>,
292304
obj: T,
293-
deleteCallback?: Array<ObjectCallback<T>>,
305+
deleteCallbacks?: Array<ObjectCallback<T>>,
294306
): void {
295-
const ix = findKubernetesObject(objects, obj);
296-
if (ix !== -1) {
297-
objects.splice(ix, 1);
298-
if (deleteCallback) {
299-
deleteCallback.forEach((elt: ObjectCallback<T>) => elt(obj));
307+
const namespace = obj.metadata!.namespace || '';
308+
const name = obj.metadata!.name || '';
309+
310+
const namespaceObjects = objects.get(namespace);
311+
if (!namespaceObjects) {
312+
return;
313+
}
314+
const deleted = namespaceObjects.delete(name);
315+
if (deleted) {
316+
if (deleteCallbacks) {
317+
deleteCallbacks.forEach((elt: ObjectCallback<T>) => elt(obj));
318+
}
319+
if (namespaceObjects.size === 0) {
320+
objects.delete(namespace);
300321
}
301322
}
302323
}

0 commit comments

Comments
 (0)