Skip to content

Commit 3b02c7d

Browse files
author
Zhen Li
committed
Apply back pressure using local record buffer.
1 parent 652f5d8 commit 3b02c7d

File tree

1 file changed

+24
-2
lines changed

1 file changed

+24
-2
lines changed

src/internal/stream-observers.js

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class ResultStreamObserver extends StreamObserver {
9595
this._discard = false
9696
this._fetchSize = fetchSize
9797
this._setState(reactive ? _states.READY : _states.READY_STREAMING)
98+
this._setupAuoPull(fetchSize)
9899
}
99100

100101
/**
@@ -113,6 +114,9 @@ class ResultStreamObserver extends StreamObserver {
113114
})
114115
} else {
115116
this._queuedRecords.push(record)
117+
if (this._queuedRecords.length > this._highRecordWatermark) {
118+
this._autoPull = false
119+
}
116120
}
117121
}
118122

@@ -182,6 +186,12 @@ class ResultStreamObserver extends StreamObserver {
182186
if (this._queuedRecords.length > 0 && observer.onNext) {
183187
for (let i = 0; i < this._queuedRecords.length; i++) {
184188
observer.onNext(this._queuedRecords[i])
189+
if (this._queuedRecords.length - i - 1 <= this._lowRecordWatermark) {
190+
this._autoPull = true
191+
if (this._state === _states.READY) {
192+
this._handleStreaming()
193+
}
194+
}
185195
}
186196
}
187197
if (this._tail && observer.onCompleted) {
@@ -334,15 +344,16 @@ class ResultStreamObserver extends StreamObserver {
334344
if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) {
335345
if (this._discard) {
336346
this._discardFunction(this._connection, this._queryId, this)
337-
} else {
347+
this._setState(_states.STREAMING)
348+
} else if (this._autoPull) {
338349
this._moreFunction(
339350
this._connection,
340351
this._queryId,
341352
this._fetchSize,
342353
this
343354
)
355+
this._setState(_states.STREAMING)
344356
}
345-
this._setState(_states.STREAMING)
346357
}
347358
}
348359

@@ -360,6 +371,17 @@ class ResultStreamObserver extends StreamObserver {
360371
_setState (state) {
361372
this._state = state
362373
}
374+
375+
_setupAuoPull (fetchSize) {
376+
this._autoPull = true
377+
if (fetchSize === ALL) {
378+
this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull
379+
this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull
380+
} else {
381+
this._lowRecordWatermark = 0.3 * fetchSize
382+
this._highRecordWatermark = 0.7 * fetchSize
383+
}
384+
}
363385
}
364386

365387
class LoginObserver extends StreamObserver {

0 commit comments

Comments
 (0)