diff --git a/src/cache.ts b/src/cache.ts index af5e5bc852..be9bf6688f 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -34,6 +34,7 @@ export class ListWatch implements ObjectCache, In private readonly watch: Watch; private readonly listFn: ListPromise; private readonly labelSelector?: string; + private readonly fieldSelector?: string; public constructor( path: string, @@ -41,11 +42,13 @@ export class ListWatch implements ObjectCache, In listFn: ListPromise, autoStart: boolean = true, labelSelector?: string, + fieldSelector?: string, ) { this.path = path; this.watch = watch; this.listFn = listFn; this.labelSelector = labelSelector; + this.fieldSelector = fieldSelector; this.callbackCache[ADD] = []; this.callbackCache[UPDATE] = []; @@ -140,7 +143,10 @@ export class ListWatch implements ObjectCache, In private async doneHandler(err: any): Promise { this._stop(); - if (err && err.statusCode === 410) { + if ( + err && + ((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410) + ) { this.resourceVersion = ''; } else if (err) { this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err)); @@ -162,17 +168,21 @@ export class ListWatch implements ObjectCache, In } this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice()); this.addOrUpdateItems(list.items); - this.resourceVersion = list.metadata!.resourceVersion || ''; + this.resourceVersion = list.metadata ? list.metadata!.resourceVersion || '' : ''; } const queryParams = { resourceVersion: this.resourceVersion, } as { resourceVersion: string | undefined; labelSelector: string | undefined; + fieldSelector: string | undefined; }; if (this.labelSelector !== undefined) { queryParams.labelSelector = ObjectSerializer.serialize(this.labelSelector, 'string'); } + if (this.fieldSelector !== undefined) { + queryParams.fieldSelector = ObjectSerializer.serialize(this.fieldSelector, 'string'); + } this.request = await this.watch.watch( this.path, queryParams, @@ -182,6 +192,9 @@ export class ListWatch implements ObjectCache, In } private addOrUpdateItems(items: T[]): void { + if (items === undefined || items === null) { + return; + } items.forEach((obj: T) => { addOrUpdateObject( this.objects, @@ -192,13 +205,18 @@ export class ListWatch implements ObjectCache, In }); } - private watchHandler(phase: string, obj: T, watchObj?: any): void { + private async watchHandler( + phase: string, + obj: T, + watchObj?: { type: string; object: KubernetesObject }, + ): Promise { switch (phase) { case 'ERROR': if ((obj as { code?: number }).code === 410) { this.resourceVersion = ''; } - break; + // We don't restart here, because it should be handled by the watch exiting if necessary + return; case 'ADDED': case 'MODIFIED': addOrUpdateObject( @@ -215,15 +233,16 @@ export class ListWatch implements ObjectCache, In // nothing to do, here for documentation, mostly. break; } - if (watchObj && watchObj.metadata) { - this.resourceVersion = watchObj.metadata.resourceVersion; - } + this.resourceVersion = obj.metadata ? obj.metadata!.resourceVersion || '' : ''; } } // exported for testing export function cacheMapFromList(newObjects: T[]): CacheMap { const objects: CacheMap = new Map(); + if (newObjects === undefined || newObjects === null) { + return objects; + } // build up the new list for (const obj of newObjects) { let namespaceObjects = objects.get(obj.metadata!.namespace || ''); diff --git a/src/cache_test.ts b/src/cache_test.ts index 315a4f55c5..9746425149 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -235,6 +235,7 @@ describe('ListWatchCache', () => { watchHandler('ADDED', { metadata: { name: 'name3', + resourceVersion: 'blah', } as V1ObjectMeta, } as V1Namespace); @@ -245,40 +246,28 @@ describe('ListWatchCache', () => { } as V1ObjectMeta, } as V1Namespace); - watchHandler( - 'DELETED', - { - metadata: { - name: 'name2', - resourceVersion: 'blah', - } as V1ObjectMeta, - } as V1Namespace, - { - metadata: { - resourceVersion: '54321', - }, - }, - ); + watchHandler('DELETED', { + metadata: { + name: 'name2', + resourceVersion: '54321', + } as V1ObjectMeta, + } as V1Namespace); const [addResult, updateResult, deleteResult] = await Promise.all([ addPromise, updatePromise, deletePromise, ]); - deepStrictEqual(addResult.metadata, { name: 'name3' }); + deepStrictEqual(addResult.metadata, { name: 'name3', resourceVersion: 'blah' }); deepStrictEqual(updateResult.metadata, { name: 'name3', resourceVersion: 'baz' }); - deepStrictEqual(deleteResult.metadata, { name: 'name2', resourceVersion: 'blah' }); + deepStrictEqual(deleteResult.metadata, { name: 'name2', resourceVersion: '54321' }); strictEqual(informer.latestResourceVersion(), '54321'); - watchHandler( - 'BOOKMARK', - {}, - { - metadata: { - resourceVersion: '5454', - }, + watchHandler('BOOKMARK', { + metadata: { + resourceVersion: '5454', }, - ); + }); strictEqual(informer.latestResourceVersion(), '5454'); }); @@ -1205,9 +1194,10 @@ describe('ListWatchCache', () => { { metadata: { name: 'name3', + resourceVersion: '23456', } as V1ObjectMeta, } as V1Namespace, - { metadata: { resourceVersion: '23456' } }, + { type: 'ADDED', metadata: { resourceVersion: '23456' } }, ); await informer.stop(); @@ -1259,9 +1249,87 @@ describe('ListWatchCache', () => { { metadata: { name: 'name3', + resourceVersion: '23456', } as V1ObjectMeta, } as V1Namespace, - { metadata: { resourceVersion: '23456' } }, + { type: 'ADDED', metadata: { resourceVersion: '23456' } }, + ); + + await informer.stop(); + + let errorEmitted = false; + informer.on('error', () => (errorEmitted = true)); + + promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + resolve({}); + }); + }); + + informer.start(); + await promise; + + const [, , , doneHandler] = mock.capture(fakeWatch.watch).last(); + + const object = { + kind: 'Status', + apiVersion: 'v1', + metadata: {}, + status: 'Failure', + message: 'too old resource version: 12345 (1234)', + reason: 'Expired', + code: 410, + }; + await watchHandler('ERROR', object, { type: 'ERROR', object }); + await doneHandler(null); + + mock.verify( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thrice(); + strictEqual(errorEmitted, false); + strictEqual(listCalls, 2); + }); + + it('should list if the watch errors from the last version', async () => { + const fakeWatch = mock.mock(Watch); + + let listCalls = 0; + const listFn: ListPromise = function (): Promise { + return new Promise((resolve, reject) => { + listCalls++; + resolve({ + metadata: { + resourceVersion: '12345', + } as V1ListMeta, + items: [], + } as V1NamespaceList); + }); + }; + let promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + resolve({}); + }); + }); + + const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false); + + informer.start(); + await promise; + + const [, , watchHandler] = mock.capture(fakeWatch.watch).last(); + watchHandler( + 'ADDED', + { + metadata: { + name: 'name3', + resourceVersion: '23456', + } as V1ObjectMeta, + } as V1Namespace, + { type: 'ADDED', metadata: { resourceVersion: '23456' } }, ); await informer.stop(); @@ -1335,6 +1403,8 @@ describe('ListWatchCache', () => { ); await informer.stop(); + strictEqual(listCalls, 1); + listCalls = 0; let errorEmitted = false; informer.on('error', () => (errorEmitted = true));