From 652f5d8519a596899a3bf2631f13e801904fd0ca Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Wed, 18 Dec 2019 16:55:40 +0100 Subject: [PATCH 1/2] Refactoring the stream observer to use a state machine to describe the inner state changes. This stream observer is used as reactive run and pull handler, async run and pull handler, as well as other message handlers. For reactive run and pull handler, the start state is `READY` state where a run message is sent and we will auto write a pull message once the run message reply is received. For async run and pull handler, the start state is `READY_STREAMING` where a run and a pull message are sent and we will auto write a pull message once apull message reply is received with has_more flag. For other single message handlers, the start state is `STREAMING` where we will wait for a success reply and save all meta received. --- src/internal/stream-observers.js | 372 ++++++++++++++++++------------- 1 file changed, 215 insertions(+), 157 deletions(-) diff --git a/src/internal/stream-observers.js b/src/internal/stream-observers.js index a3eceb807..b1bf3e077 100644 --- a/src/internal/stream-observers.js +++ b/src/internal/stream-observers.js @@ -72,8 +72,6 @@ class ResultStreamObserver extends StreamObserver { super() this._connection = connection - this._reactive = reactive - this._streaming = false this._fieldKeys = null this._fieldLookup = null @@ -81,7 +79,6 @@ class ResultStreamObserver extends StreamObserver { this._queuedRecords = [] this._tail = null this._error = null - this._hasFailed = false this._observers = [] this._meta = {} @@ -97,7 +94,7 @@ class ResultStreamObserver extends StreamObserver { this._discardFunction = discardFunction this._discard = false this._fetchSize = fetchSize - this._finished = false + this._setState(reactive ? _states.READY : _states.READY_STREAMING) } /** @@ -120,6 +117,130 @@ class ResultStreamObserver extends StreamObserver { } onCompleted (meta) { + this._state.onSuccess(this, meta) + } + + /** + * Will be called on errors. + * If user-provided observer is present, pass the error + * to it's onError method, otherwise set instance variable _error. + * @param {Object} error - An error object + */ + onError (error) { + this._state.onError(this, error) + } + + /** + * Cancel pending record stream + */ + cancel () { + this._discard = true + } + + /** + * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. + * Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. + * + * However, some operations can be represented as a single message which receives full metadata in a single response. + * For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3. + * Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3. + * + * This function prepares the observer to only handle a single response message. + */ + prepareToHandleSingleResponse () { + this._head = [] + this._fieldKeys = [] + this._setState(_states.STREAMING) + } + + /** + * Mark this observer as if it has completed with no metadata. + */ + markCompleted () { + this._head = [] + this._fieldKeys = [] + this._tail = {} + this._setState(_states.SUCCEEDED) + } + + /** + * Subscribe to events with provided observer. + * @param {Object} observer - Observer object + * @param {function(keys: String[])} observer.onKeys - Handle stream header, field keys. + * @param {function(record: Object)} observer.onNext - Handle records, one by one. + * @param {function(metadata: Object)} observer.onCompleted - Handle stream tail, the metadata. + * @param {function(error: Object)} observer.onError - Handle errors, should always be provided. + */ + subscribe (observer) { + if (this._error) { + observer.onError(this._error) + return + } + if (this._head && observer.onKeys) { + observer.onKeys(this._head) + } + if (this._queuedRecords.length > 0 && observer.onNext) { + for (let i = 0; i < this._queuedRecords.length; i++) { + observer.onNext(this._queuedRecords[i]) + } + } + if (this._tail && observer.onCompleted) { + observer.onCompleted(this._tail) + } + this._observers.push(observer) + + if (this._state === _states.READY) { + this._handleStreaming() + } + } + + _handleHasMore (meta) { + // We've consumed current batch and server notified us that there're more + // records to stream. Let's invoke more or discard function based on whether + // the user wants to discard streaming or not + this._setState(_states.READY) // we've done streaming + this._handleStreaming() + delete meta.has_more + } + + _handlePullSuccess (meta) { + this._setState(_states.SUCCEEDED) + const completionMetadata = Object.assign( + this._connection ? { server: this._connection.server } : {}, + this._meta, + meta + ) + + let beforeHandlerResult = null + if (this._beforeComplete) { + beforeHandlerResult = this._beforeComplete(completionMetadata) + } + + const continuation = () => { + // End of stream + this._tail = completionMetadata + + if (this._observers.some(o => o.onCompleted)) { + this._observers.forEach(o => { + if (o.onCompleted) { + o.onCompleted(completionMetadata) + } + }) + } + + if (this._afterComplete) { + this._afterComplete(completionMetadata) + } + } + + if (beforeHandlerResult) { + Promise.resolve(beforeHandlerResult).then(() => continuation()) + } else { + continuation() + } + } + + _handleRunSuccess (meta, afterSuccess) { if (this._fieldKeys === null) { // Stream header, build a name->index field lookup table // to be used by records. This is an optimization to make it @@ -168,9 +289,7 @@ class ResultStreamObserver extends StreamObserver { this._afterKeys(this._fieldKeys) } - if (this._reactive) { - this._handleStreaming() - } + afterSuccess() } if (beforeHandlerResult) { @@ -178,132 +297,11 @@ class ResultStreamObserver extends StreamObserver { } else { continuation() } - } else { - this._streaming = false - - if (meta.has_more) { - // We've consumed current batch and server notified us that there're more - // records to stream. Let's invoke more or discard function based on whether - // the user wants to discard streaming or not - this._handleStreaming() - - delete meta.has_more - } else { - this._finished = true - const completionMetadata = Object.assign( - this._connection ? { server: this._connection.server } : {}, - this._meta, - meta - ) - - let beforeHandlerResult = null - if (this._beforeComplete) { - beforeHandlerResult = this._beforeComplete(completionMetadata) - } - - const continuation = () => { - // End of stream - this._tail = completionMetadata - - if (this._observers.some(o => o.onCompleted)) { - this._observers.forEach(o => { - if (o.onCompleted) { - o.onCompleted(completionMetadata) - } - }) - } - - if (this._afterComplete) { - this._afterComplete(completionMetadata) - } - } - - if (beforeHandlerResult) { - Promise.resolve(beforeHandlerResult).then(() => continuation()) - } else { - continuation() - } - } - } - } - - _handleStreaming () { - if ( - this._head && - this._observers.some(o => o.onNext || o.onCompleted) && - !this._streaming - ) { - this._streaming = true - - if (this._discard) { - this._discardFunction(this._connection, this._queryId, this) - } else { - this._moreFunction( - this._connection, - this._queryId, - this._fetchSize, - this - ) - } - } - } - - _storeMetadataForCompletion (meta) { - const keys = Object.keys(meta) - let index = keys.length - let key = '' - - while (index--) { - key = keys[index] - this._meta[key] = meta[key] } } - /** - * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. - * Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. - * - * However, some operations can be represented as a single message which receives full metadata in a single response. - * For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3. - * Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3. - * - * This function prepares the observer to only handle a single response message. - */ - prepareToHandleSingleResponse () { - this._head = [] - this._fieldKeys = [] - } - - /** - * Mark this observer as if it has completed with no metadata. - */ - markCompleted () { - this._head = [] - this._fieldKeys = [] - this._tail = {} - this._finished = true - } - - /** - * Cancel pending record stream - */ - cancel () { - this._discard = true - } - - /** - * Will be called on errors. - * If user-provided observer is present, pass the error - * to it's onError method, otherwise set instance variable _error. - * @param {Object} error - An error object - */ - onError (error) { - if (this._hasFailed) { - return - } - - this._finished = true - this._hasFailed = true + _handleError (error) { + this._setState(_states.FAILED) this._error = error let beforeHandlerResult = null @@ -332,39 +330,35 @@ class ResultStreamObserver extends StreamObserver { } } - /** - * Subscribe to events with provided observer. - * @param {Object} observer - Observer object - * @param {function(keys: String[])} observer.onKeys - Handle stream header, field keys. - * @param {function(record: Object)} observer.onNext - Handle records, one by one. - * @param {function(metadata: Object)} observer.onCompleted - Handle stream tail, the metadata. - * @param {function(error: Object)} observer.onError - Handle errors, should always be provided. - */ - subscribe (observer) { - if (this._error) { - observer.onError(this._error) - return - } - if (this._head && observer.onKeys) { - observer.onKeys(this._head) - } - if (this._queuedRecords.length > 0 && observer.onNext) { - for (let i = 0; i < this._queuedRecords.length; i++) { - observer.onNext(this._queuedRecords[i]) + _handleStreaming () { + if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { + if (this._discard) { + this._discardFunction(this._connection, this._queryId, this) + } else { + this._moreFunction( + this._connection, + this._queryId, + this._fetchSize, + this + ) } + this._setState(_states.STREAMING) } - if (this._tail && observer.onCompleted) { - observer.onCompleted(this._tail) - } - this._observers.push(observer) + } - if (this._reactive && !this._finished) { - this._handleStreaming() + _storeMetadataForCompletion (meta) { + const keys = Object.keys(meta) + let index = keys.length + let key = '' + + while (index--) { + key = keys[index] + this._meta[key] = meta[key] } } - hasFailed () { - return this._hasFailed + _setState (state) { + this._state = state } } @@ -496,6 +490,70 @@ class CompletedObserver extends ResultStreamObserver { } } +const _states = { + READY_STREAMING: { + // async start state + onSuccess: (streamObserver, meta) => { + streamObserver._handleRunSuccess( + meta, + () => { + streamObserver._setState(_states.STREAMING) + } // after run succeeded, async directly move to streaming + // state + ) + }, + onError: (streamObserver, error) => { + streamObserver._handleError(error) + }, + name: () => { + return 'READY_STREAMING' + } + }, + READY: { + // reactive start state + onSuccess: (streamObserver, meta) => { + streamObserver._handleRunSuccess( + meta, + () => streamObserver._handleStreaming() // after run succeeded received, reactive shall start pulling + ) + }, + onError: (streamObserver, error) => { + streamObserver._handleError(error) + }, + name: () => { + return 'READY' + } + }, + STREAMING: { + onSuccess: (streamObserver, meta) => { + if (meta.has_more) { + streamObserver._handleHasMore(meta) + } else { + streamObserver._handlePullSuccess(meta) + } + }, + onError: (streamObserver, error) => { + streamObserver._handleError(error) + }, + name: () => { + return 'STREAMING' + } + }, + FAILED: { + onError: error => { + // more errors are ignored + }, + name: () => { + return 'FAILED' + } + }, + SUCCEEDED: { + name: () => { + return 'SUCCEEDED' + } + } +} + export { StreamObserver, ResultStreamObserver, From 3b02c7d0a2e2d48b243c025366935fb425798baa Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Fri, 20 Dec 2019 16:40:41 +0100 Subject: [PATCH 2/2] Apply back pressure using local record buffer. --- src/internal/stream-observers.js | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/src/internal/stream-observers.js b/src/internal/stream-observers.js index b1bf3e077..3a65fcc0f 100644 --- a/src/internal/stream-observers.js +++ b/src/internal/stream-observers.js @@ -95,6 +95,7 @@ class ResultStreamObserver extends StreamObserver { this._discard = false this._fetchSize = fetchSize this._setState(reactive ? _states.READY : _states.READY_STREAMING) + this._setupAuoPull(fetchSize) } /** @@ -113,6 +114,9 @@ class ResultStreamObserver extends StreamObserver { }) } else { this._queuedRecords.push(record) + if (this._queuedRecords.length > this._highRecordWatermark) { + this._autoPull = false + } } } @@ -182,6 +186,12 @@ class ResultStreamObserver extends StreamObserver { if (this._queuedRecords.length > 0 && observer.onNext) { for (let i = 0; i < this._queuedRecords.length; i++) { observer.onNext(this._queuedRecords[i]) + if (this._queuedRecords.length - i - 1 <= this._lowRecordWatermark) { + this._autoPull = true + if (this._state === _states.READY) { + this._handleStreaming() + } + } } } if (this._tail && observer.onCompleted) { @@ -334,15 +344,16 @@ class ResultStreamObserver extends StreamObserver { if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { if (this._discard) { this._discardFunction(this._connection, this._queryId, this) - } else { + this._setState(_states.STREAMING) + } else if (this._autoPull) { this._moreFunction( this._connection, this._queryId, this._fetchSize, this ) + this._setState(_states.STREAMING) } - this._setState(_states.STREAMING) } } @@ -360,6 +371,17 @@ class ResultStreamObserver extends StreamObserver { _setState (state) { this._state = state } + + _setupAuoPull (fetchSize) { + this._autoPull = true + if (fetchSize === ALL) { + this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull + this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull + } else { + this._lowRecordWatermark = 0.3 * fetchSize + this._highRecordWatermark = 0.7 * fetchSize + } + } } class LoginObserver extends StreamObserver {