Skip to content

Commit 79e3895

Browse files
authored
Improve back-pressure control in Result[Symbol.asyncIterator] (#864)
The previous implemenation only controls the flow while is iterating over the records, but if the iteration is done in a really low pace, the control flow method won't be called in time for pausing the flow. Changing the flow control for observing the queue size instead of check on each time it iterates solve this issue.
1 parent d34d4f1 commit 79e3895

File tree

2 files changed

+49
-13
lines changed

2 files changed

+49
-13
lines changed

packages/core/src/result.ts

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -241,19 +241,28 @@ class Result implements Promise<QueryResult> {
241241
summary?: ResultSummary,
242242
} = { paused: true, firstRun: true, finished: false }
243243

244-
245-
const controlFlow = async () => {
246-
if (state.queuedObserver === undefined) {
247-
state.queuedObserver = this._createQueuedResultObserver()
248-
state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined)
244+
const controlFlow = () => {
245+
if (!state.streaming) {
246+
return;
249247
}
250-
if (state.queuedObserver.size >= this._watermarks.high && !state.paused) {
248+
const queueSizeIsOverHighOrEqualWatermark = state.queuedObserver!.size >= this._watermarks.high
249+
const queueSizeIsBellowOrEqualLowWatermark = state.queuedObserver!.size <= this._watermarks.low
250+
251+
if (queueSizeIsOverHighOrEqualWatermark && !state.paused) {
251252
state.paused = true
252-
state.streaming?.pause()
253-
} else if (state.queuedObserver.size <= this._watermarks.low && state.paused || state.firstRun) {
253+
state.streaming.pause()
254+
} else if (queueSizeIsBellowOrEqualLowWatermark && state.paused || state.firstRun && !queueSizeIsOverHighOrEqualWatermark ) {
254255
state.firstRun = false
255256
state.paused = false
256-
state.streaming?.resume()
257+
state.streaming.resume()
258+
}
259+
}
260+
261+
const initializeObserver = async () => {
262+
if (state.queuedObserver === undefined) {
263+
state.queuedObserver = this._createQueuedResultObserver(controlFlow)
264+
state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined)
265+
controlFlow()
257266
}
258267
}
259268

@@ -262,7 +271,7 @@ class Result implements Promise<QueryResult> {
262271
if (state.finished) {
263272
return { done: true, value: state.summary! }
264273
}
265-
await controlFlow()
274+
await initializeObserver()
266275
const next = await state.queuedObserver!.dequeue()
267276
if (next.done) {
268277
state.finished = next.done
@@ -280,7 +289,7 @@ class Result implements Promise<QueryResult> {
280289
if (state.finished) {
281290
return { done: true, value: state.summary! }
282291
}
283-
await controlFlow()
292+
await initializeObserver()
284293
return await state.queuedObserver!.head()
285294
}
286295
}
@@ -461,7 +470,7 @@ class Result implements Promise<QueryResult> {
461470
/**
462471
* @access private
463472
*/
464-
private _createQueuedResultObserver (): QueuedResultObserver {
473+
private _createQueuedResultObserver (onQueueSizeChanged: () => void): QueuedResultObserver {
465474
interface ResolvablePromise<T> {
466475
promise: Promise<T>
467476
resolve: (arg: T) => any | undefined
@@ -509,11 +518,13 @@ class Result implements Promise<QueryResult> {
509518
}
510519
} else {
511520
buffer.push(element)
521+
onQueueSizeChanged()
512522
}
513523
},
514524
dequeue: async () => {
515525
if (buffer.length > 0) {
516526
const element = buffer.shift()!
527+
onQueueSizeChanged()
517528
if (isError(element)) {
518529
throw element
519530
}
@@ -538,6 +549,8 @@ class Result implements Promise<QueryResult> {
538549
} catch (error) {
539550
buffer.unshift(error)
540551
throw error
552+
} finally {
553+
onQueueSizeChanged()
541554
}
542555
},
543556
get size (): number {

packages/core/test/result.test.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,16 +598,39 @@ describe('Result', () => {
598598

599599
it('should pause the stream if queue is bigger than high watermark', async () => {
600600
const pause = jest.spyOn(streamObserverMock, 'pause')
601+
const resume = jest.spyOn(streamObserverMock, 'resume')
601602
streamObserverMock.onKeys(['a'])
602603

603-
for (let i = 0; i <= watermarks.high; i++) {
604+
for (let i = 0; i < watermarks.high + 3; i++) {
604605
streamObserverMock.onNext([i])
605606
}
606607

607608
const it = result[Symbol.asyncIterator]()
608609
await it.next()
609610

610611
expect(pause).toBeCalledTimes(1)
612+
expect(resume).toBeCalledTimes(0)
613+
})
614+
615+
it('should pause the stream if queue is bigger than high watermark and not iteraction with the stream', async () => {
616+
const pause = jest.spyOn(streamObserverMock, 'pause')
617+
const resume = jest.spyOn(streamObserverMock, 'resume')
618+
streamObserverMock.onKeys(['a'])
619+
620+
streamObserverMock.onNext([-1])
621+
622+
const it = result[Symbol.asyncIterator]()
623+
await it.next()
624+
625+
expect(pause).toBeCalledTimes(1)
626+
expect(resume).toBeCalledTimes(1)
627+
628+
for (let i = 0; i <= watermarks.high + 1; i++) {
629+
streamObserverMock.onNext([i])
630+
}
631+
632+
expect(pause).toBeCalledTimes(2)
633+
expect(resume).toBeCalledTimes(1)
611634
})
612635

613636
it('should call resume if queue is smaller than low watermark', async () => {

0 commit comments

Comments
 (0)