diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index bc011e701..302f7d56c 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -241,19 +241,28 @@ class Result implements Promise { summary?: ResultSummary, } = { paused: true, firstRun: true, finished: false } - - const controlFlow = async () => { - if (state.queuedObserver === undefined) { - state.queuedObserver = this._createQueuedResultObserver() - state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined) + const controlFlow = () => { + if (!state.streaming) { + return; } - if (state.queuedObserver.size >= this._watermarks.high && !state.paused) { + const queueSizeIsOverHighOrEqualWatermark = state.queuedObserver!.size >= this._watermarks.high + const queueSizeIsBellowOrEqualLowWatermark = state.queuedObserver!.size <= this._watermarks.low + + if (queueSizeIsOverHighOrEqualWatermark && !state.paused) { state.paused = true - state.streaming?.pause() - } else if (state.queuedObserver.size <= this._watermarks.low && state.paused || state.firstRun) { + state.streaming.pause() + } else if (queueSizeIsBellowOrEqualLowWatermark && state.paused || state.firstRun && !queueSizeIsOverHighOrEqualWatermark ) { state.firstRun = false state.paused = false - state.streaming?.resume() + state.streaming.resume() + } + } + + const initializeObserver = async () => { + if (state.queuedObserver === undefined) { + state.queuedObserver = this._createQueuedResultObserver(controlFlow) + state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined) + controlFlow() } } @@ -262,7 +271,7 @@ class Result implements Promise { if (state.finished) { return { done: true, value: state.summary! } } - await controlFlow() + await initializeObserver() const next = await state.queuedObserver!.dequeue() if (next.done) { state.finished = next.done @@ -280,7 +289,7 @@ class Result implements Promise { if (state.finished) { return { done: true, value: state.summary! } } - await controlFlow() + await initializeObserver() return await state.queuedObserver!.head() } } @@ -461,7 +470,7 @@ class Result implements Promise { /** * @access private */ - private _createQueuedResultObserver (): QueuedResultObserver { + private _createQueuedResultObserver (onQueueSizeChanged: () => void): QueuedResultObserver { interface ResolvablePromise { promise: Promise resolve: (arg: T) => any | undefined @@ -509,11 +518,13 @@ class Result implements Promise { } } else { buffer.push(element) + onQueueSizeChanged() } }, dequeue: async () => { if (buffer.length > 0) { const element = buffer.shift()! + onQueueSizeChanged() if (isError(element)) { throw element } @@ -538,6 +549,8 @@ class Result implements Promise { } catch (error) { buffer.unshift(error) throw error + } finally { + onQueueSizeChanged() } }, get size (): number { diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index db60b50d0..674734394 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -598,9 +598,10 @@ describe('Result', () => { it('should pause the stream if queue is bigger than high watermark', async () => { const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') streamObserverMock.onKeys(['a']) - for (let i = 0; i <= watermarks.high; i++) { + for (let i = 0; i < watermarks.high + 3; i++) { streamObserverMock.onNext([i]) } @@ -608,6 +609,28 @@ describe('Result', () => { await it.next() expect(pause).toBeCalledTimes(1) + expect(resume).toBeCalledTimes(0) + }) + + it('should pause the stream if queue is bigger than high watermark and not iteraction with the stream', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onKeys(['a']) + + streamObserverMock.onNext([-1]) + + const it = result[Symbol.asyncIterator]() + await it.next() + + expect(pause).toBeCalledTimes(1) + expect(resume).toBeCalledTimes(1) + + for (let i = 0; i <= watermarks.high + 1; i++) { + streamObserverMock.onNext([i]) + } + + expect(pause).toBeCalledTimes(2) + expect(resume).toBeCalledTimes(1) }) it('should call resume if queue is smaller than low watermark', async () => {