diff --git a/src/cache.ts b/src/cache.ts index ade3d4611a6..4fc8c3279b0 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -20,8 +20,11 @@ export interface ObjectCache { list(namespace?: string): ReadonlyArray; } +// exported for testing +export type CacheMap = Map>; + export class ListWatch implements ObjectCache, Informer { - private objects: T[] = []; + private objects: CacheMap = new Map(); private resourceVersion: string; private readonly indexCache: { [key: string]: T[] } = {}; private readonly callbackCache: { [key: string]: (ObjectCallback | ErrorCallback)[] } = {}; @@ -93,16 +96,26 @@ export class ListWatch implements ObjectCache, In } public get(name: string, namespace?: string): T | undefined { - return this.objects.find((obj: T): boolean => { - return obj.metadata!.name === name && (!namespace || obj.metadata!.namespace === namespace); - }); + const nsObjects = this.objects.get(namespace || ''); + if (nsObjects) { + return nsObjects.get(name); + } + return undefined; } public list(namespace?: string | undefined): ReadonlyArray { if (!namespace) { - return this.objects; + const allObjects: T[] = []; + for (const nsObjects of this.objects.values()) { + allObjects.push(...nsObjects.values()); + } + return allObjects; + } + const namespaceObjects = this.objects.get(namespace || ''); + if (!namespaceObjects) { + return []; } - return this.indexCache[namespace] as ReadonlyArray; + return Array.from(namespaceObjects.values()); } public latestResourceVersion(): string { @@ -116,7 +129,7 @@ export class ListWatch implements ObjectCache, In } } - private async doneHandler(err: any): Promise { + private async doneHandler(err: any): Promise { this._stop(); if (err && err.statusCode === 410) { this.resourceVersion = ''; @@ -133,16 +146,8 @@ export class ListWatch implements ObjectCache, In const promise = this.listFn(); const list = await promise; this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice()); - Object.keys(this.indexCache).forEach((key) => { - const updateObjects = deleteItems(this.indexCache[key], list.items); - if (updateObjects.length !== 0) { - this.indexCache[key] = updateObjects; - } else { - delete this.indexCache[key]; - } - }); this.addOrUpdateItems(list.items); - this.resourceVersion = list.metadata!.resourceVersion!; + this.resourceVersion = list.metadata!.resourceVersion || ''; } const queryParams = { resourceVersion: this.resourceVersion, @@ -169,21 +174,9 @@ export class ListWatch implements ObjectCache, In this.callbackCache[ADD].slice(), this.callbackCache[UPDATE].slice(), ); - if (obj.metadata!.namespace) { - this.indexObj(obj); - } }); } - private indexObj(obj: T): void { - let namespaceList = this.indexCache[obj.metadata!.namespace!] as T[]; - if (!namespaceList) { - namespaceList = []; - this.indexCache[obj.metadata!.namespace!] = namespaceList; - } - addOrUpdateObject(namespaceList, obj); - } - private watchHandler(phase: string, obj: T, watchObj?: any): void { switch (phase) { case 'ERROR': @@ -199,18 +192,9 @@ export class ListWatch implements ObjectCache, In this.callbackCache[ADD].slice(), this.callbackCache[UPDATE].slice(), ); - if (obj.metadata!.namespace) { - this.indexObj(obj); - } break; case 'DELETED': deleteObject(this.objects, obj, this.callbackCache[DELETE].slice()); - if (obj.metadata!.namespace) { - const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[]; - if (namespaceList) { - deleteObject(namespaceList, obj); - } - } break; case 'BOOKMARK': // nothing to do, here for documentation, mostly. @@ -222,51 +206,86 @@ export class ListWatch implements ObjectCache, In } } +// exported for testing +export function cacheMapFromList(newObjects: T[]): CacheMap { + const objects: CacheMap = new Map(); + // build up the new list + for (const obj of newObjects) { + let namespaceObjects = objects.get(obj.metadata!.namespace || ''); + if (!namespaceObjects) { + namespaceObjects = new Map(); + objects.set(obj.metadata!.namespace || '', namespaceObjects); + } + + const name = obj.metadata!.name || ''; + namespaceObjects.set(name, obj); + } + return objects; +} + // external for testing export function deleteItems( - oldObjects: T[], + oldObjects: CacheMap, newObjects: T[], deleteCallback?: ObjectCallback[], -): T[] { - return oldObjects.filter((obj: T) => { - if (findKubernetesObject(newObjects, obj) === -1) { - if (deleteCallback) { - deleteCallback.forEach((fn: ObjectCallback) => fn(obj)); +): CacheMap { + const newObjectsMap = cacheMapFromList(newObjects); + + for (const [namespace, oldNamespaceObjects] of oldObjects.entries()) { + const newNamespaceObjects = newObjectsMap.get(namespace); + if (newNamespaceObjects) { + for (const [name, oldObj] of oldNamespaceObjects.entries()) { + if (!newNamespaceObjects.has(name)) { + oldNamespaceObjects.delete(name); + if (deleteCallback) { + deleteCallback.forEach((fn: ObjectCallback) => fn(oldObj)); + } + } } - return false; + } else { + oldObjects.delete(namespace); + oldNamespaceObjects.forEach((obj: T) => { + if (deleteCallback) { + deleteCallback.forEach((fn: ObjectCallback) => fn(obj)); + } + }); } - return true; - }); + } + + return oldObjects; } // Only public for testing. export function addOrUpdateObject( - objects: T[], + objects: CacheMap, obj: T, - addCallback?: ObjectCallback[], - updateCallback?: ObjectCallback[], + addCallbacks?: ObjectCallback[], + updateCallbacks?: ObjectCallback[], ): void { - const ix = findKubernetesObject(objects, obj); - if (ix === -1) { - objects.push(obj); - if (addCallback) { - addCallback.forEach((elt: ObjectCallback) => elt(obj)); + let namespaceObjects = objects.get(obj.metadata!.namespace || ''); + if (!namespaceObjects) { + namespaceObjects = new Map(); + objects.set(obj.metadata!.namespace || '', namespaceObjects); + } + + const name = obj.metadata!.name || ''; + const found = namespaceObjects.get(name); + if (!found) { + namespaceObjects.set(name, obj); + if (addCallbacks) { + addCallbacks.forEach((elt: ObjectCallback) => elt(obj)); } } else { - if (!isSameVersion(objects[ix], obj)) { - objects[ix] = obj; - if (updateCallback) { - updateCallback.forEach((elt: ObjectCallback) => elt(obj)); + if (!isSameVersion(found, obj)) { + namespaceObjects.set(name, obj); + if (updateCallbacks) { + updateCallbacks.forEach((elt: ObjectCallback) => elt(obj)); } } } } -function isSameObject(o1: T, o2: T): boolean { - return o1.metadata!.name === o2.metadata!.name && o1.metadata!.namespace === o2.metadata!.namespace; -} - -function isSameVersion(o1: T, o2: T): boolean { +function isSameVersion(o1: any, o2: T): boolean { return ( o1.metadata!.resourceVersion !== undefined && o1.metadata!.resourceVersion !== null && @@ -274,23 +293,26 @@ function isSameVersion(o1: T, o2: T): boolean { ); } -function findKubernetesObject(objects: T[], obj: T): number { - return objects.findIndex((elt: T) => { - return isSameObject(elt, obj); - }); -} - // Public for testing. export function deleteObject( - objects: T[], + objects: CacheMap, obj: T, - deleteCallback?: ObjectCallback[], + deleteCallbacks?: ObjectCallback[], ): void { - const ix = findKubernetesObject(objects, obj); - if (ix !== -1) { - objects.splice(ix, 1); - if (deleteCallback) { - deleteCallback.forEach((elt: ObjectCallback) => elt(obj)); + const namespace = obj.metadata!.namespace || ''; + const name = obj.metadata!.name || ''; + + const namespaceObjects = objects.get(namespace); + if (!namespaceObjects) { + return; + } + const deleted = namespaceObjects.delete(name); + if (deleted) { + if (deleteCallbacks) { + deleteCallbacks.forEach((elt: ObjectCallback) => elt(obj)); + } + if (namespaceObjects.size === 0) { + objects.delete(namespace); } } } diff --git a/src/cache_test.ts b/src/cache_test.ts index e5bf2de5aab..dd037117f5a 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -3,8 +3,8 @@ import chaiAsPromised from 'chai-as-promised'; import mock from 'ts-mockito'; -import { V1ListMeta, V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod } from './api'; -import { deleteItems, deleteObject, ListWatch } from './cache'; +import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1PodList, V1ListMeta } from './api'; +import { deleteObject, ListWatch, deleteItems, CacheMap, cacheMapFromList, addOrUpdateObject } from './cache'; import { KubeConfig } from './config'; import { Cluster, Context, User } from './config_types'; import { ListPromise } from './informer'; @@ -62,26 +62,26 @@ describe('ListWatchCache', () => { it('should perform basic caching', async () => { const fakeWatch = mock.mock(Watch); - const list: V1Namespace[] = [ + const list: V1Pod[] = [ { metadata: { name: 'name1', namespace: 'default', } as V1ObjectMeta, - } as V1Namespace, + } as V1Pod, { metadata: { name: 'name2', namespace: 'default', } as V1ObjectMeta, - } as V1Namespace, + } as V1Pod, ]; const listObj = { metadata: { resourceVersion: '12345', } as V1ListMeta, items: list, - } as V1NamespaceList; + } as V1PodList; const emptyObj = { metadata: { @@ -93,13 +93,13 @@ describe('ListWatchCache', () => { name: 'name3', namespace: 'default', } as V1ObjectMeta, - } as V1Namespace, + } as V1Pod, ], - } as V1NamespaceList; + } as V1PodList; let calls = 0; - const listFn: ListPromise = function (): Promise { - return new Promise((resolve, reject) => { + const listFn: ListPromise = function (): Promise { + return new Promise((resolve, reject) => { if (calls++ === 0) { resolve(listObj); } else { @@ -120,25 +120,22 @@ describe('ListWatchCache', () => { expect(pathOut).to.equal('/some/path'); expect(cache.list()).to.deep.equal(list); - expect(cache.get('name1')).to.equal(list[0]); - expect(cache.get('name2')).to.equal(list[1]); - expect(cache.list('default')).to.deep.equal(list); - expect(cache.list('non-existent')).to.be.undefined; + expect(cache.list('non-existent')).to.deep.equal([]); watchHandler('ADDED', { metadata: { name: 'name3', namespace: 'other', } as V1ObjectMeta, - } as V1Namespace); + } as V1Pod); expect(cache.list().length).to.equal(3); expect(cache.get('name3')).to.not.equal(null); expect(cache.list('default').length).to.equal(2); expect(cache.list('other').length).to.equal(1); - expect(cache.list('non-existent')).to.be.undefined; + expect(cache.list('non-existent')).to.deep.equal([]); watchHandler('MODIFIED', { metadata: { @@ -146,7 +143,7 @@ describe('ListWatchCache', () => { namespace: 'other', resourceVersion: 'baz', } as V1ObjectMeta, - } as V1Namespace); + } as V1Pod); expect(cache.list().length).to.equal(3); const obj3 = cache.get('name3'); expect(obj3).to.not.equal(null); @@ -160,7 +157,7 @@ describe('ListWatchCache', () => { name: 'name2', namespace: 'default', } as V1ObjectMeta, - } as V1Namespace); + } as V1Pod); expect(cache.list().length).to.equal(2); expect(cache.get('name2')).to.equal(undefined); @@ -172,14 +169,14 @@ describe('ListWatchCache', () => { name: 'name2', namespace: 'default', } as V1ObjectMeta, - } as V1Namespace); + } as V1Pod); const error = new Error('Gone') as Error & { statusCode: number | undefined }; error.statusCode = 410; await doneHandler(error); - expect(cache.list().length, 'all namespace list').to.equal(1); - expect(cache.list('default').length, 'default namespace list').to.equal(1); - expect(cache.list('other'), 'other namespace list').to.be.undefined; + expect(cache.list().length, 'all pod list').to.equal(1); + expect(cache.list('default').length, 'default pod list').to.equal(1); + expect(cache.list('other'), 'other pod list').to.deep.equal([]); }); it('should perform work as an informer', async () => { @@ -553,10 +550,10 @@ describe('ListWatchCache', () => { resourceVersion: '12345', } as V1ListMeta, items: list, - } as V1NamespaceList; + } as V1PodList; - const listFn: ListPromise = function (): Promise { - return new Promise((resolve, reject) => { + const listFn: ListPromise = function (): Promise { + return new Promise((resolve, reject) => { resolve(listObj); }); }; @@ -573,9 +570,6 @@ describe('ListWatchCache', () => { expect(pathOut).to.equal('/some/path'); expect(cache.list()).to.deep.equal(list); - expect(cache.get('name1')).to.equal(list[0]); - expect(cache.get('name2')).to.equal(list[1]); - expect(cache.list('ns1').length).to.equal(1); expect(cache.list('ns1')[0].metadata!.name).to.equal('name1'); @@ -627,6 +621,81 @@ describe('ListWatchCache', () => { expect(cache.get('name2', 'ns2')).to.equal(undefined); }); + it('should perform non-namespace caching', async () => { + const fakeWatch = mock.mock(Watch); + const list: V1Namespace[] = [ + { + metadata: { + name: 'name1', + } as V1ObjectMeta, + } as V1Namespace, + { + metadata: { + name: 'name2', + } as V1ObjectMeta, + } as V1Namespace, + ]; + const listObj = { + metadata: { + resourceVersion: '12345', + } as V1ListMeta, + items: list, + } as V1NamespaceList; + + const listFn: ListPromise = function (): Promise { + return new Promise((resolve, reject) => { + resolve(listObj); + }); + }; + const promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + resolve(new AbortController()); + }); + }); + const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); + await promise; + const [pathOut, , watchHandler] = mock.capture(fakeWatch.watch).last(); + expect(pathOut).to.equal('/some/path'); + expect(cache.list()).to.deep.equal(list); + expect(cache.get('name1')).to.not.equal(null); + expect(cache.get('name2')).to.not.equal(null); + + expect(cache.list('ns1')).to.deep.equal([]); + + watchHandler('ADDED', { + metadata: { + name: 'name3', + } as V1ObjectMeta, + } as V1Namespace); + + expect(cache.list().length).to.equal(3); + expect(cache.get('name3')).to.not.equal(null); + + watchHandler('MODIFIED', { + metadata: { + name: 'name3', + resourceVersion: 'baz', + } as V1ObjectMeta, + } as V1Namespace); + expect(cache.list().length).to.equal(3); + const obj3 = cache.get('name3'); + expect(obj3).to.not.equal(null); + if (obj3) { + expect(obj3.metadata!.name).to.equal('name3'); + expect(obj3.metadata!.resourceVersion).to.equal('baz'); + } + + watchHandler('DELETED', { + metadata: { + name: 'name2', + } as V1ObjectMeta, + } as V1Namespace); + expect(cache.list().length).to.equal(2); + expect(cache.get('name2', 'ns2')).to.equal(undefined); + }); + it('should delete an object correctly', () => { const list: V1Pod[] = [ { @@ -642,27 +711,31 @@ describe('ListWatchCache', () => { } as V1ObjectMeta, } as V1Pod, ]; - deleteObject(list, { + const objs: CacheMap = new Map(); + list.forEach((elt: V1Pod) => { + addOrUpdateObject(objs, elt); + }); + deleteObject(objs, { metadata: { name: 'other', namespace: 'ns1', }, } as V1Pod); - expect(list.length).to.equal(2); - deleteObject(list, { + expect(objs.size).to.equal(2); + deleteObject(objs, { metadata: { name: 'name1', namespace: 'ns2', }, } as V1Pod); - expect(list.length).to.equal(2); - deleteObject(list, { + expect(objs.size).to.equal(2); + deleteObject(objs, { metadata: { name: 'name1', namespace: 'ns1', }, } as V1Pod); - expect(list.length).to.equal(1); + expect(objs.size).to.equal(1); }); it('should not call handlers which have been unregistered', async () => { @@ -1297,7 +1370,12 @@ describe('delete items', () => { ]; const pods: V1Pod[] = []; - deleteItems(listA, listB, [(obj?: V1Pod) => pods.push(obj!)]); + const objs: CacheMap = new Map(); + listA.forEach((elt) => { + addOrUpdateObject(objs, elt); + }); + + deleteItems(objs, listB, [(obj?: V1Pod) => pods.push(obj!)]); expect(pods).to.deep.equal(expected); });