diff --git a/src/cache.ts b/src/cache.ts index e2019212e5e..0213ef532e6 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -20,11 +20,15 @@ export interface ObjectCache { list(namespace?: string): ReadonlyArray; } +// exported for testing +export type CacheMap = Map>; + export class ListWatch implements ObjectCache, Informer { - private objects: T[] = []; + private objects: CacheMap = new Map(); private resourceVersion: string; - private readonly indexCache: { [key: string]: T[] } = {}; - private readonly callbackCache: { [key: string]: Array | ErrorCallback> } = {}; + private readonly callbackCache: { + [key: string]: Array | ErrorCallback>; + } = {}; private request: RequestResult | undefined; private stopped: boolean = false; @@ -93,18 +97,26 @@ export class ListWatch implements ObjectCache, In } public get(name: string, namespace?: string): T | undefined { - return this.objects.find( - (obj: T): boolean => { - return obj.metadata!.name === name && (!namespace || obj.metadata!.namespace === namespace); - }, - ); + const nsObjects = this.objects.get(namespace || ''); + if (nsObjects) { + return nsObjects.get(name); + } + return undefined; } public list(namespace?: string | undefined): ReadonlyArray { if (!namespace) { - return this.objects; + const allObjects: T[] = []; + for (const nsObjects of this.objects.values()) { + allObjects.push(...nsObjects.values()); + } + return allObjects; + } + const namespaceObjects = this.objects.get(namespace || ''); + if (!namespaceObjects) { + return []; } - return this.indexCache[namespace] as ReadonlyArray; + return Array.from(namespaceObjects.values()); } public latestResourceVersion(): string { @@ -118,7 +130,7 @@ export class ListWatch implements ObjectCache, In } } - private async doneHandler(err: any): Promise { + private async doneHandler(err: unknown): Promise { this._stop(); if ( err && @@ -139,16 +151,8 @@ export class ListWatch implements ObjectCache, In const result = await promise; const list = result.body; this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice()); - Object.keys(this.indexCache).forEach((key) => { - const updateObjects = deleteItems(this.indexCache[key], list.items); - if (updateObjects.length !== 0) { - this.indexCache[key] = updateObjects; - } else { - delete this.indexCache[key]; - } - }); this.addOrUpdateItems(list.items); - this.resourceVersion = list.metadata!.resourceVersion!; + this.resourceVersion = list.metadata!.resourceVersion || ''; } const queryParams = { resourceVersion: this.resourceVersion, @@ -175,21 +179,9 @@ export class ListWatch implements ObjectCache, In this.callbackCache[ADD].slice(), this.callbackCache[UPDATE].slice(), ); - if (obj.metadata!.namespace) { - this.indexObj(obj); - } }); } - private indexObj(obj: T): void { - let namespaceList = this.indexCache[obj.metadata!.namespace!] as T[]; - if (!namespaceList) { - namespaceList = []; - this.indexCache[obj.metadata!.namespace!] = namespaceList; - } - addOrUpdateObject(namespaceList, obj); - } - private async watchHandler( phase: string, obj: T, @@ -204,18 +196,9 @@ export class ListWatch implements ObjectCache, In this.callbackCache[ADD].slice(), this.callbackCache[UPDATE].slice(), ); - if (obj.metadata!.namespace) { - this.indexObj(obj); - } break; case 'DELETED': deleteObject(this.objects, obj, this.callbackCache[DELETE].slice()); - if (obj.metadata!.namespace) { - const namespaceList = this.indexCache[obj.metadata!.namespace!] as T[]; - if (namespaceList) { - deleteObject(namespaceList, obj); - } - } break; case 'BOOKMARK': // nothing to do, here for documentation, mostly. @@ -228,50 +211,85 @@ export class ListWatch implements ObjectCache, In } } +// exported for testing +export function cacheMapFromList(newObjects: T[]): CacheMap { + const objects: CacheMap = new Map(); + // build up the new list + for (const obj of newObjects) { + let namespaceObjects = objects.get(obj.metadata!.namespace || ''); + if (!namespaceObjects) { + namespaceObjects = new Map(); + objects.set(obj.metadata!.namespace || '', namespaceObjects); + } + + const name = obj.metadata!.name || ''; + namespaceObjects.set(name, obj); + } + return objects; +} + // external for testing export function deleteItems( - oldObjects: T[], + oldObjects: CacheMap, newObjects: T[], deleteCallback?: Array>, -): T[] { - return oldObjects.filter((obj: T) => { - if (findKubernetesObject(newObjects, obj) === -1) { - if (deleteCallback) { - deleteCallback.forEach((fn: ObjectCallback) => fn(obj)); +): CacheMap { + const newObjectsMap = cacheMapFromList(newObjects); + + for (const [namespace, oldNamespaceObjects] of oldObjects.entries()) { + const newNamespaceObjects = newObjectsMap.get(namespace); + if (newNamespaceObjects) { + for (const [name, oldObj] of oldNamespaceObjects.entries()) { + if (!newNamespaceObjects.has(name)) { + oldNamespaceObjects.delete(name); + if (deleteCallback) { + deleteCallback.forEach((fn: ObjectCallback) => fn(oldObj)); + } + } } - return false; + } else { + oldObjects.delete(namespace); + oldNamespaceObjects.forEach((obj: T) => { + if (deleteCallback) { + deleteCallback.forEach((fn: ObjectCallback) => fn(obj)); + } + }); } - return true; - }); + } + + return oldObjects; } // Only public for testing. export function addOrUpdateObject( - objects: T[], + objects: CacheMap, obj: T, - addCallback?: Array>, - updateCallback?: Array>, + addCallbacks?: Array>, + updateCallbacks?: Array>, ): void { - const ix = findKubernetesObject(objects, obj); - if (ix === -1) { - objects.push(obj); - if (addCallback) { - addCallback.forEach((elt: ObjectCallback) => elt(obj)); + let namespaceObjects = objects.get(obj.metadata!.namespace || ''); + if (!namespaceObjects) { + namespaceObjects = new Map(); + objects.set(obj.metadata!.namespace || '', namespaceObjects); + } + + const name = obj.metadata!.name || ''; + const found = namespaceObjects.get(name); + if (!found) { + namespaceObjects.set(name, obj); + if (addCallbacks) { + addCallbacks.forEach((elt: ObjectCallback) => elt(obj)); } } else { - if (!isSameVersion(objects[ix], obj)) { - objects[ix] = obj; - if (updateCallback) { - updateCallback.forEach((elt: ObjectCallback) => elt(obj)); + if (!isSameVersion(found, obj)) { + namespaceObjects.set(name, obj); + if (updateCallbacks) { + updateCallbacks.forEach((elt: ObjectCallback) => elt(obj)); } } } } -function isSameObject(o1: T, o2: T): boolean { - return o1.metadata!.name === o2.metadata!.name && o1.metadata!.namespace === o2.metadata!.namespace; -} - function isSameVersion(o1: T, o2: T): boolean { return ( o1.metadata!.resourceVersion !== undefined && @@ -280,23 +298,26 @@ function isSameVersion(o1: T, o2: T): boolean { ); } -function findKubernetesObject(objects: T[], obj: T): number { - return objects.findIndex((elt: T) => { - return isSameObject(elt, obj); - }); -} - // Public for testing. export function deleteObject( - objects: T[], + objects: CacheMap, obj: T, - deleteCallback?: Array>, + deleteCallbacks?: Array>, ): void { - const ix = findKubernetesObject(objects, obj); - if (ix !== -1) { - objects.splice(ix, 1); - if (deleteCallback) { - deleteCallback.forEach((elt: ObjectCallback) => elt(obj)); + const namespace = obj.metadata!.namespace || ''; + const name = obj.metadata!.name || ''; + + const namespaceObjects = objects.get(namespace); + if (!namespaceObjects) { + return; + } + const deleted = namespaceObjects.delete(name); + if (deleted) { + if (deleteCallbacks) { + deleteCallbacks.forEach((elt: ObjectCallback) => elt(obj)); + } + if (namespaceObjects.size === 0) { + objects.delete(namespace); } } } diff --git a/src/cache_test.ts b/src/cache_test.ts index 4733d300d4b..ed5416c88ab 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -8,8 +8,8 @@ import http = require('http'); import { Duplex } from 'stream'; import { EventEmitter } from 'ws'; -import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api'; -import { deleteObject, ListWatch, deleteItems } from './cache'; +import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1PodList, V1ListMeta } from './api'; +import { deleteObject, ListWatch, deleteItems, CacheMap, cacheMapFromList } from './cache'; import { KubeConfig } from './config'; import { Cluster, Context, User } from './config_types'; import { ADD, UPDATE, DELETE, ERROR, ListPromise, CHANGE } from './informer'; @@ -80,18 +80,141 @@ describe('ListWatchCache', () => { it('should perform basic caching', async () => { const fakeWatch = mock.mock(Watch); - const list: V1Namespace[] = [ + const list: V1Pod[] = [ { metadata: { name: 'name1', namespace: 'default', } as V1ObjectMeta, - } as V1Namespace, + } as V1Pod, { metadata: { name: 'name2', namespace: 'default', } as V1ObjectMeta, + } as V1Pod, + ]; + const listObj = { + metadata: { + resourceVersion: '12345', + } as V1ListMeta, + items: list, + } as V1PodList; + + const emptyObj = { + metadata: { + resourceVersion: '123456', + } as V1ListMeta, + items: [ + { + metadata: { + name: 'name3', + namespace: 'default', + } as V1ObjectMeta, + } as V1Pod, + ], + } as V1PodList; + + let calls = 0; + const listFn: ListPromise = function(): Promise<{ + response: http.IncomingMessage; + body: V1PodList; + }> { + return new Promise<{ response: http.IncomingMessage; body: V1PodList }>((resolve, reject) => { + if (calls++ === 0) { + resolve({ response: {} as http.IncomingMessage, body: listObj }); + } else { + resolve({ response: {} as http.IncomingMessage, body: emptyObj }); + } + }); + }; + const promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + resolve(new FakeRequest()); + }); + }); + const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn); + await promise; + const [pathOut, , watchHandler, doneHandler] = mock.capture(fakeWatch.watch).last(); + expect(pathOut).to.equal('/some/path'); + expect(cache.list()).to.deep.equal(list); + + expect(cache.get('name1', 'default')).to.equal(list[0]); + expect(cache.get('name2', 'default')).to.equal(list[1]); + + expect(cache.list('default')).to.deep.equal(list); + expect(cache.list('non-existent')).to.deep.equal([]); + + watchHandler('ADDED', { + metadata: { + name: 'name3', + namespace: 'other', + } as V1ObjectMeta, + } as V1Pod); + + expect(cache.list().length).to.equal(3); + expect(cache.get('name3', 'other')).to.not.equal(null); + + expect(cache.list('default').length).to.equal(2); + expect(cache.list('other').length).to.equal(1); + expect(cache.list('non-existent')).to.deep.equal([]); + + watchHandler('MODIFIED', { + metadata: { + name: 'name3', + namespace: 'other', + resourceVersion: 'baz', + } as V1ObjectMeta, + } as V1Pod); + expect(cache.list().length).to.equal(3); + const obj3 = cache.get('name3', 'other'); + expect(obj3).to.not.equal(null); + if (obj3) { + expect(obj3.metadata!.name).to.equal('name3'); + expect(obj3.metadata!.resourceVersion).to.equal('baz'); + } + + watchHandler('DELETED', { + metadata: { + name: 'name2', + namespace: 'default', + } as V1ObjectMeta, + } as V1Pod); + expect(cache.list().length).to.equal(2); + expect(cache.get('name2', 'default')).to.equal(undefined); + + expect(cache.list('default').length).to.equal(1); + expect(cache.list('other').length).to.equal(1); + + watchHandler('ADDED', { + metadata: { + name: 'name2', + namespace: 'default', + } as V1ObjectMeta, + } as V1Pod); + + const error = new Error('Gone') as Error & { statusCode: number | undefined }; + error.statusCode = 410; + await doneHandler(error); + expect(cache.list().length, 'all namespace list').to.equal(1); + expect(cache.list('default').length, 'default namespace list').to.equal(1); + expect(cache.list('other'), 'other namespace list').to.deep.equal([]); + }); + + it('should perform basic caching of non-namespaced objects', async () => { + const fakeWatch = mock.mock(Watch); + const list: V1Namespace[] = [ + { + metadata: { + name: 'name1', + } as V1ObjectMeta, + } as V1Namespace, + { + metadata: { + name: 'name2', + } as V1ObjectMeta, } as V1Namespace, ]; const listObj = { @@ -109,13 +232,12 @@ describe('ListWatchCache', () => { { metadata: { name: 'name3', - namespace: 'default', } as V1ObjectMeta, } as V1Namespace, ], } as V1NamespaceList; - var calls = 0; + let calls = 0; const listFn: ListPromise = function(): Promise<{ response: http.IncomingMessage; body: V1NamespaceList; @@ -146,27 +268,25 @@ describe('ListWatchCache', () => { expect(cache.get('name1')).to.equal(list[0]); expect(cache.get('name2')).to.equal(list[1]); - expect(cache.list('default')).to.deep.equal(list); - expect(cache.list('non-existent')).to.be.undefined; + expect(cache.list('default')).to.deep.equal([]); + expect(cache.list('non-existent')).to.deep.equal([]); watchHandler('ADDED', { metadata: { name: 'name3', - namespace: 'other', } as V1ObjectMeta, } as V1Namespace); expect(cache.list().length).to.equal(3); expect(cache.get('name3')).to.not.equal(null); - expect(cache.list('default').length).to.equal(2); - expect(cache.list('other').length).to.equal(1); - expect(cache.list('non-existent')).to.be.undefined; + expect(cache.list('default').length).to.equal(0); + expect(cache.list('other').length).to.equal(0); + expect(cache.list('non-existent')).to.deep.equal([]); watchHandler('MODIFIED', { metadata: { name: 'name3', - namespace: 'other', resourceVersion: 'baz', } as V1ObjectMeta, } as V1Namespace); @@ -181,19 +301,14 @@ describe('ListWatchCache', () => { watchHandler('DELETED', { metadata: { name: 'name2', - namespace: 'default', } as V1ObjectMeta, } as V1Namespace); expect(cache.list().length).to.equal(2); expect(cache.get('name2')).to.equal(undefined); - expect(cache.list('default').length).to.equal(1); - expect(cache.list('other').length).to.equal(1); - watchHandler('ADDED', { metadata: { name: 'name2', - namespace: 'default', } as V1ObjectMeta, } as V1Namespace); @@ -201,8 +316,6 @@ describe('ListWatchCache', () => { error.statusCode = 410; await doneHandler(error); expect(cache.list().length, 'all namespace list').to.equal(1); - expect(cache.list('default').length, 'default namespace list').to.equal(1); - expect(cache.list('other'), 'other namespace list').to.be.undefined; }); it('should perform work as an informer', async () => { @@ -634,8 +747,8 @@ describe('ListWatchCache', () => { expect(pathOut).to.equal('/some/path'); expect(cache.list()).to.deep.equal(list); - expect(cache.get('name1')).to.equal(list[0]); - expect(cache.get('name2')).to.equal(list[1]); + expect(cache.get('name1', 'ns1')).to.equal(list[0]); + expect(cache.get('name2', 'ns2')).to.equal(list[1]); expect(cache.list('ns1').length).to.equal(1); expect(cache.list('ns1')[0].metadata!.name).to.equal('name1'); @@ -689,7 +802,7 @@ describe('ListWatchCache', () => { }); it('should delete an object correctly', () => { - const list: V1Pod[] = [ + const cache: CacheMap = cacheMapFromList([ { metadata: { name: 'name1', @@ -702,28 +815,34 @@ describe('ListWatchCache', () => { namespace: 'ns2', } as V1ObjectMeta, } as V1Pod, - ]; - deleteObject(list, { + ]); + deleteObject(cache, { metadata: { name: 'other', namespace: 'ns1', }, } as V1Pod); - expect(list.length).to.equal(2); - deleteObject(list, { + expect(cache.size).to.equal(2); + expect((cache.get('ns1') || new Map()).size).to.equal(1); + expect((cache.get('ns2') || new Map()).size).to.equal(1); + deleteObject(cache, { metadata: { name: 'name1', namespace: 'ns2', }, } as V1Pod); - expect(list.length).to.equal(2); - deleteObject(list, { + expect(cache.size).to.equal(2); + expect((cache.get('ns1') || new Map()).size).to.equal(1); + expect((cache.get('ns2') || new Map()).size).to.equal(1); + deleteObject(cache, { metadata: { name: 'name1', namespace: 'ns1', }, } as V1Pod); - expect(list.length).to.equal(1); + expect(cache.size).to.equal(1); + expect(cache.has('ns1')).to.equal(false); + expect((cache.get('ns2') || new Map()).size).to.equal(1); }); it('should not call handlers which have been unregistered', async () => { @@ -1330,7 +1449,7 @@ describe('ListWatchCache', () => { describe('delete items', () => { it('should remove correctly', () => { - const listA: V1Pod[] = [ + const cacheA: CacheMap = cacheMapFromList([ { metadata: { name: 'name1', @@ -1343,7 +1462,7 @@ describe('delete items', () => { namespace: 'ns2', } as V1ObjectMeta, } as V1Pod, - ]; + ]); const listB: V1Pod[] = [ { metadata: { @@ -1368,7 +1487,7 @@ describe('delete items', () => { ]; const pods: V1Pod[] = []; - deleteItems(listA, listB, [(obj?: V1Pod) => pods.push(obj!)]); + deleteItems(cacheA, listB, [(obj?: V1Pod) => pods.push(obj!)]); expect(pods).to.deep.equal(expected); });