diff --git a/src/internal/stream-observers.js b/src/internal/stream-observers.js index a3eceb807..3a65fcc0f 100644 --- a/src/internal/stream-observers.js +++ b/src/internal/stream-observers.js @@ -72,8 +72,6 @@ class ResultStreamObserver extends StreamObserver { super() this._connection = connection - this._reactive = reactive - this._streaming = false this._fieldKeys = null this._fieldLookup = null @@ -81,7 +79,6 @@ class ResultStreamObserver extends StreamObserver { this._queuedRecords = [] this._tail = null this._error = null - this._hasFailed = false this._observers = [] this._meta = {} @@ -97,7 +94,8 @@ class ResultStreamObserver extends StreamObserver { this._discardFunction = discardFunction this._discard = false this._fetchSize = fetchSize - this._finished = false + this._setState(reactive ? _states.READY : _states.READY_STREAMING) + this._setupAuoPull(fetchSize) } /** @@ -116,10 +114,143 @@ class ResultStreamObserver extends StreamObserver { }) } else { this._queuedRecords.push(record) + if (this._queuedRecords.length > this._highRecordWatermark) { + this._autoPull = false + } } } onCompleted (meta) { + this._state.onSuccess(this, meta) + } + + /** + * Will be called on errors. + * If user-provided observer is present, pass the error + * to it's onError method, otherwise set instance variable _error. + * @param {Object} error - An error object + */ + onError (error) { + this._state.onError(this, error) + } + + /** + * Cancel pending record stream + */ + cancel () { + this._discard = true + } + + /** + * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. + * Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. + * + * However, some operations can be represented as a single message which receives full metadata in a single response. + * For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3. + * Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3. + * + * This function prepares the observer to only handle a single response message. + */ + prepareToHandleSingleResponse () { + this._head = [] + this._fieldKeys = [] + this._setState(_states.STREAMING) + } + + /** + * Mark this observer as if it has completed with no metadata. + */ + markCompleted () { + this._head = [] + this._fieldKeys = [] + this._tail = {} + this._setState(_states.SUCCEEDED) + } + + /** + * Subscribe to events with provided observer. + * @param {Object} observer - Observer object + * @param {function(keys: String[])} observer.onKeys - Handle stream header, field keys. + * @param {function(record: Object)} observer.onNext - Handle records, one by one. + * @param {function(metadata: Object)} observer.onCompleted - Handle stream tail, the metadata. + * @param {function(error: Object)} observer.onError - Handle errors, should always be provided. + */ + subscribe (observer) { + if (this._error) { + observer.onError(this._error) + return + } + if (this._head && observer.onKeys) { + observer.onKeys(this._head) + } + if (this._queuedRecords.length > 0 && observer.onNext) { + for (let i = 0; i < this._queuedRecords.length; i++) { + observer.onNext(this._queuedRecords[i]) + if (this._queuedRecords.length - i - 1 <= this._lowRecordWatermark) { + this._autoPull = true + if (this._state === _states.READY) { + this._handleStreaming() + } + } + } + } + if (this._tail && observer.onCompleted) { + observer.onCompleted(this._tail) + } + this._observers.push(observer) + + if (this._state === _states.READY) { + this._handleStreaming() + } + } + + _handleHasMore (meta) { + // We've consumed current batch and server notified us that there're more + // records to stream. Let's invoke more or discard function based on whether + // the user wants to discard streaming or not + this._setState(_states.READY) // we've done streaming + this._handleStreaming() + delete meta.has_more + } + + _handlePullSuccess (meta) { + this._setState(_states.SUCCEEDED) + const completionMetadata = Object.assign( + this._connection ? { server: this._connection.server } : {}, + this._meta, + meta + ) + + let beforeHandlerResult = null + if (this._beforeComplete) { + beforeHandlerResult = this._beforeComplete(completionMetadata) + } + + const continuation = () => { + // End of stream + this._tail = completionMetadata + + if (this._observers.some(o => o.onCompleted)) { + this._observers.forEach(o => { + if (o.onCompleted) { + o.onCompleted(completionMetadata) + } + }) + } + + if (this._afterComplete) { + this._afterComplete(completionMetadata) + } + } + + if (beforeHandlerResult) { + Promise.resolve(beforeHandlerResult).then(() => continuation()) + } else { + continuation() + } + } + + _handleRunSuccess (meta, afterSuccess) { if (this._fieldKeys === null) { // Stream header, build a name->index field lookup table // to be used by records. This is an optimization to make it @@ -168,9 +299,7 @@ class ResultStreamObserver extends StreamObserver { this._afterKeys(this._fieldKeys) } - if (this._reactive) { - this._handleStreaming() - } + afterSuccess() } if (beforeHandlerResult) { @@ -178,132 +307,11 @@ class ResultStreamObserver extends StreamObserver { } else { continuation() } - } else { - this._streaming = false - - if (meta.has_more) { - // We've consumed current batch and server notified us that there're more - // records to stream. Let's invoke more or discard function based on whether - // the user wants to discard streaming or not - this._handleStreaming() - - delete meta.has_more - } else { - this._finished = true - const completionMetadata = Object.assign( - this._connection ? { server: this._connection.server } : {}, - this._meta, - meta - ) - - let beforeHandlerResult = null - if (this._beforeComplete) { - beforeHandlerResult = this._beforeComplete(completionMetadata) - } - - const continuation = () => { - // End of stream - this._tail = completionMetadata - - if (this._observers.some(o => o.onCompleted)) { - this._observers.forEach(o => { - if (o.onCompleted) { - o.onCompleted(completionMetadata) - } - }) - } - - if (this._afterComplete) { - this._afterComplete(completionMetadata) - } - } - - if (beforeHandlerResult) { - Promise.resolve(beforeHandlerResult).then(() => continuation()) - } else { - continuation() - } - } - } - } - - _handleStreaming () { - if ( - this._head && - this._observers.some(o => o.onNext || o.onCompleted) && - !this._streaming - ) { - this._streaming = true - - if (this._discard) { - this._discardFunction(this._connection, this._queryId, this) - } else { - this._moreFunction( - this._connection, - this._queryId, - this._fetchSize, - this - ) - } } } - _storeMetadataForCompletion (meta) { - const keys = Object.keys(meta) - let index = keys.length - let key = '' - - while (index--) { - key = keys[index] - this._meta[key] = meta[key] - } - } - - /** - * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. - * Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. - * - * However, some operations can be represented as a single message which receives full metadata in a single response. - * For example, operations to begin, commit and rollback an explicit transaction use two messages in Bolt V1 but a single message in Bolt V3. - * Messages are `RUN "BEGIN" {}` + `PULL_ALL` in Bolt V1 and `BEGIN` in Bolt V3. - * - * This function prepares the observer to only handle a single response message. - */ - prepareToHandleSingleResponse () { - this._head = [] - this._fieldKeys = [] - } - - /** - * Mark this observer as if it has completed with no metadata. - */ - markCompleted () { - this._head = [] - this._fieldKeys = [] - this._tail = {} - this._finished = true - } - - /** - * Cancel pending record stream - */ - cancel () { - this._discard = true - } - - /** - * Will be called on errors. - * If user-provided observer is present, pass the error - * to it's onError method, otherwise set instance variable _error. - * @param {Object} error - An error object - */ - onError (error) { - if (this._hasFailed) { - return - } - - this._finished = true - this._hasFailed = true + _handleError (error) { + this._setState(_states.FAILED) this._error = error let beforeHandlerResult = null @@ -332,39 +340,47 @@ class ResultStreamObserver extends StreamObserver { } } - /** - * Subscribe to events with provided observer. - * @param {Object} observer - Observer object - * @param {function(keys: String[])} observer.onKeys - Handle stream header, field keys. - * @param {function(record: Object)} observer.onNext - Handle records, one by one. - * @param {function(metadata: Object)} observer.onCompleted - Handle stream tail, the metadata. - * @param {function(error: Object)} observer.onError - Handle errors, should always be provided. - */ - subscribe (observer) { - if (this._error) { - observer.onError(this._error) - return - } - if (this._head && observer.onKeys) { - observer.onKeys(this._head) - } - if (this._queuedRecords.length > 0 && observer.onNext) { - for (let i = 0; i < this._queuedRecords.length; i++) { - observer.onNext(this._queuedRecords[i]) + _handleStreaming () { + if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { + if (this._discard) { + this._discardFunction(this._connection, this._queryId, this) + this._setState(_states.STREAMING) + } else if (this._autoPull) { + this._moreFunction( + this._connection, + this._queryId, + this._fetchSize, + this + ) + this._setState(_states.STREAMING) } } - if (this._tail && observer.onCompleted) { - observer.onCompleted(this._tail) - } - this._observers.push(observer) + } - if (this._reactive && !this._finished) { - this._handleStreaming() + _storeMetadataForCompletion (meta) { + const keys = Object.keys(meta) + let index = keys.length + let key = '' + + while (index--) { + key = keys[index] + this._meta[key] = meta[key] } } - hasFailed () { - return this._hasFailed + _setState (state) { + this._state = state + } + + _setupAuoPull (fetchSize) { + this._autoPull = true + if (fetchSize === ALL) { + this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull + this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull + } else { + this._lowRecordWatermark = 0.3 * fetchSize + this._highRecordWatermark = 0.7 * fetchSize + } } } @@ -496,6 +512,70 @@ class CompletedObserver extends ResultStreamObserver { } } +const _states = { + READY_STREAMING: { + // async start state + onSuccess: (streamObserver, meta) => { + streamObserver._handleRunSuccess( + meta, + () => { + streamObserver._setState(_states.STREAMING) + } // after run succeeded, async directly move to streaming + // state + ) + }, + onError: (streamObserver, error) => { + streamObserver._handleError(error) + }, + name: () => { + return 'READY_STREAMING' + } + }, + READY: { + // reactive start state + onSuccess: (streamObserver, meta) => { + streamObserver._handleRunSuccess( + meta, + () => streamObserver._handleStreaming() // after run succeeded received, reactive shall start pulling + ) + }, + onError: (streamObserver, error) => { + streamObserver._handleError(error) + }, + name: () => { + return 'READY' + } + }, + STREAMING: { + onSuccess: (streamObserver, meta) => { + if (meta.has_more) { + streamObserver._handleHasMore(meta) + } else { + streamObserver._handlePullSuccess(meta) + } + }, + onError: (streamObserver, error) => { + streamObserver._handleError(error) + }, + name: () => { + return 'STREAMING' + } + }, + FAILED: { + onError: error => { + // more errors are ignored + }, + name: () => { + return 'FAILED' + } + }, + SUCCEEDED: { + name: () => { + return 'SUCCEEDED' + } + } +} + export { StreamObserver, ResultStreamObserver,