diff --git a/packages/neo4j-driver/src/driver.js b/packages/neo4j-driver/src/driver.js index f1c8d6889..e72365ad6 100644 --- a/packages/neo4j-driver/src/driver.js +++ b/packages/neo4j-driver/src/driver.js @@ -70,7 +70,7 @@ class Driver extends CoreDriver { bookmarkOrBookmarks: bookmarks, database, impersonatedUser, - reactive: true, + reactive: false, fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize) }), config: this._config diff --git a/packages/neo4j-driver/src/result-rx.js b/packages/neo4j-driver/src/result-rx.js index dfb5cc01b..9f28982c3 100644 --- a/packages/neo4j-driver/src/result-rx.js +++ b/packages/neo4j-driver/src/result-rx.js @@ -18,7 +18,7 @@ */ import { newError, Record, ResultSummary } from 'neo4j-driver-core' import { Observable, Subject, ReplaySubject, from } from 'rxjs' -import { flatMap, publishReplay, refCount, shareReplay } from 'rxjs/operators' +import { flatMap, publishReplay, refCount } from 'rxjs/operators' const States = { READY: 0, @@ -44,7 +44,8 @@ export default class RxResult { publishReplay(1), refCount() ) - this._records = new Subject() + this._records = undefined + this._controls = new StreamControl() this._summary = new ReplaySubject() this._state = States.READY } @@ -72,7 +73,7 @@ export default class RxResult { * @returns {Observable} - An observable stream of records. */ records () { - return this._result.pipe( + const result = this._result.pipe( flatMap( result => new Observable(recordsObserver => @@ -80,6 +81,8 @@ export default class RxResult { ) ) ) + result.push = () => this._push() + return result } /** @@ -102,6 +105,44 @@ export default class RxResult { ) } + /** + * Pauses the automatic streaming of records. + * + * This method provides a way of controll the flow of records + * + * @experimental + */ + pause () { + this._controls.pause() + } + + /** + * Resumes the automatic streaming of records. + * + * This method won't need to be called in normal stream operation. It only applies to the case when the stream is paused. + * + * This method is method won't start the consuming records if the ${@link records()} stream didn't get subscribed. + * @experimental + * @returns {Promise} - A promise that resolves when the stream is resumed. + */ + resume () { + return this._controls.resume() + } + + /** + * Pushes the next record to the stream. + * + * This method automatic pause the auto-streaming of records and then push next record to the stream. + * + * For returning the automatic streaming of records, use {@link resume} method. + * + * @experimental + * @returns {Promise} - A promise that resolves when the push is completed. + */ + push () { + return this._controls.push() + } + _startStreaming ({ result, recordsObserver = null, @@ -115,9 +156,11 @@ export default class RxResult { if (this._state < States.STREAMING) { this._state = States.STREAMING - + this._setupRecordsStream(result) if (recordsObserver) { subscriptions.push(this._records.subscribe(recordsObserver)) + } else { + result._cancel() } subscriptions.push({ @@ -127,30 +170,6 @@ export default class RxResult { } } }) - - if (this._records.observers.length === 0) { - result._cancel() - } - - result.subscribe({ - onNext: record => { - this._records.next(record) - }, - onCompleted: summary => { - this._records.complete() - - this._summary.next(summary) - this._summary.complete() - - this._state = States.COMPLETED - }, - onError: err => { - this._records.error(err) - this._summary.error(err) - - this._state = States.COMPLETED - } - }) } else if (recordsObserver) { recordsObserver.error( newError( @@ -163,4 +182,102 @@ export default class RxResult { subscriptions.forEach(s => s.unsubscribe()) } } + + _setupRecordsStream (result) { + if (this._records) { + return this._records + } + + this._records = createFullyControlledSubject( + result[Symbol.asyncIterator](), + { + complete: async () => { + this._state = States.COMPLETED + this._summary.next(await result.summary()) + this._summary.complete() + }, + error: error => { + this._state = States.COMPLETED + this._summary.error(error) + } + }, + this._controls + ) + return this._records + } +} + +function createFullyControlledSubject ( + iterator, + completeObserver, + streamControl = new StreamControl() +) { + const subject = new Subject() + + const pushNextValue = async result => { + try { + streamControl.pushing = true + const { done, value } = await result + if (done) { + subject.complete() + completeObserver.complete() + } else { + subject.next(value) + if (!streamControl.paused) { + setImmediate(async () => await pushNextValue(iterator.next())) + } + } + } catch (error) { + subject.error(error) + completeObserver.error(error) + } finally { + streamControl.pushing = false + } + } + + async function push (value) { + await pushNextValue(iterator.next(value)) + } + + streamControl.pusher = push + push() + + return subject +} + +class StreamControl { + constructor (push = async () => {}) { + this._paused = false + this._pushing = false + this._push = push + } + + pause () { + this._paused = true + } + + get paused () { + return this._paused + } + + set pushing (pushing) { + this._pushing = pushing + } + + async resume () { + const wasPaused = this._paused + this._paused = false + if (wasPaused && !this._pushing) { + await this._push() + } + } + + async push () { + this.pause() + return await this._push() + } + + set pusher (push) { + this._push = push + } } diff --git a/packages/neo4j-driver/test/examples.test.js b/packages/neo4j-driver/test/examples.test.js index b7ac87fed..a2e1e39c0 100644 --- a/packages/neo4j-driver/test/examples.test.js +++ b/packages/neo4j-driver/test/examples.test.js @@ -17,10 +17,16 @@ * limitations under the License. */ -import neo4j from '../src' +import neo4j, { session } from '../src' import sharedNeo4j from './internal/shared-neo4j' import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version' -import { map, materialize, toArray } from 'rxjs/operators' +import { + bufferCount, + map, + materialize, + mergeMap, + toArray +} from 'rxjs/operators' import { Notification } from 'rxjs' /** @@ -1529,6 +1535,66 @@ describe('#integration examples', () => { } }) }) + + describe('back-pressure', () => { + it('should control flow by manual push records to the stream', done => { + const driver = driverGlobal + + const session = driver.rxSession() + const xs = [] + + const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x') + result + .records() + .pipe(map(record => record.get('x').toInt())) + .subscribe({ + next: async x => { + xs.push(x) + // manual pushing reoords to the stream + // it pauses the automatic pushing + await result.push() + }, + complete: async () => { + expect(xs).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + await session.close().toPromise() + done() + }, + error: done.fail.bind(done) + }) + }) + }) + + it('should control flow by resume and pause the stream', async () => { + const driver = driverGlobal + const callCostlyApi = async () => {} + + const session = driver.rxSession() + + try { + const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x') + const xs = await result + .records() + .pipe( + map(record => record.get('x').toInt()), + bufferCount(5), // buffer 5 records + mergeMap(async theXs => { + // pausing the records coming from the stream + result.pause() + // some costly operation + await callCostlyApi(theXs) + // resume the stream + await result.resume() + return theXs + }), + toArray() + ) + .toPromise() + + expect(xs).toEqual([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10]]) + } finally { + await session.close().toPromise() + } + }) }) function removeLineBreaks (string) { diff --git a/packages/neo4j-driver/test/rx/result.test.js b/packages/neo4j-driver/test/rx/result.test.js new file mode 100644 index 000000000..0f2a65002 --- /dev/null +++ b/packages/neo4j-driver/test/rx/result.test.js @@ -0,0 +1,857 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import RxResult from '../../src/result-rx' +import { newError, Record, Result, ResultSummary } from 'neo4j-driver-core' +import { Observable } from 'rxjs' +import { toArray, take, tap } from 'rxjs/operators' + +describe('#unit RxResult', () => { + describe('.records()', () => { + it('should be able the consume the full stream', async () => { + const fetchSize = 3 + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + 'query', + undefined, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream(queue, stream, fetchSize, 2) + + spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + const records = await rxResult + .records() + .pipe(toArray()) + .toPromise() + + expect(records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2), + new Record(keys, rawRecord3), + new Record(keys, rawRecord4), + new Record(keys, rawRecord5), + new Record(keys, rawRecord6) + ]) + }) + + it('should be able to take one', async () => { + const fetchSize = 3 + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + 'query', + undefined, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream(queue, stream, fetchSize, 2) + + spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + const record = await rxResult + .records() + .pipe(take(1)) + .toPromise() + + expect(record).toEqual(new Record(keys, rawRecord1)) + }) + + it('should be able to pause the stream', async () => { + const fetchSize = 3 + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + 'query', + undefined, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream(queue, stream, fetchSize, 1) + + const resume = spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + const pause = spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + const record = await rxResult + .records() + .pipe( + tap(() => rxResult.pause()), + take(1) + ) + .toPromise() + + expect(record).toEqual(new Record(keys, rawRecord1)) + + await new Promise(resolve => setTimeout(resolve, 1000)) + + expect(resume.calls.mostRecent().invocationOrder).toBeLessThan( + pause.calls.mostRecent().invocationOrder + ) + }) + + it('should be able to resume the stream', async () => { + const fetchSize = 3 + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + 'query', + undefined, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream(queue, stream, fetchSize, 1) + + const resume = spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + const pause = spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + const record = await rxResult + .records() + .pipe( + tap(() => rxResult.pause()), + take(1) + ) + .toPromise() + + expect(record).toEqual(new Record(keys, rawRecord1)) + + await waitFor(1000) + + expect(resume.calls.mostRecent().invocationOrder).toBeLessThan( + pause.calls.mostRecent().invocationOrder + ) + + await rxResult.resume() + + await waitFor(1000) + + expect(resume.calls.mostRecent().invocationOrder).toBeGreaterThan( + pause.calls.mostRecent().invocationOrder + ) + }) + + it('should be able to maual control the stream', async () => { + const fetchSize = 3 + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + 'query', + undefined, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream(queue, stream, fetchSize, 1) + + const resume = spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + const pause = spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + const records = await new Promise(resolve => { + const list = [] + rxResult.pause() + rxResult + .records() + .pipe(take(2)) + .subscribe({ + next: async record => { + list.push(record) + await rxResult.push() + }, + complete: () => resolve(list) + }) + }) + + expect(records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2) + ]) + + await waitFor(1000) + + expect(resume.calls.mostRecent().invocationOrder).toBeLessThan( + pause.calls.mostRecent().invocationOrder + ) + }) + + it('should be able to capture errors during the stream', async () => { + const fetchSize = 3 + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + 'query', + undefined, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + const expectedError = newError('the error') + + stream.onError(expectedError) + + try { + await new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + .records() + .pipe(take(1)) + .toPromise() + expect('should not reach here').toBe('') + } catch (error) { + expect(error).toEqual(expectedError) + } + }) + + describe('and then result.consume()', () => { + it('should be able to get a summary of a consumed stream', async () => { + const fetchSize = 3 + const metadata = { + resultConsumedAfter: 20, + resultAvailableAfter: 124, + extraInfo: 'extra' + } + const query = 'query' + const parameters = { a: 1, b: 2 } + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + query, + parameters, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream( + queue, + stream, + fetchSize, + 2, + metadata + ) + + spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + await rxResult + .records() + .pipe(toArray()) + .toPromise() + + const summary = await rxResult.consume().toPromise() + + const expectedSummary = new ResultSummary(query, parameters, metadata) + + expect(summary).toEqual(expectedSummary) + }) + + it('should be able to get a summary of a partially consumed stream', async () => { + const fetchSize = 3 + const metadata = { + resultConsumedAfter: 20, + resultAvailableAfter: 124, + extraInfo: 'extra' + } + const query = 'query' + const parameters = { a: 1, b: 2 } + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + query, + parameters, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream( + queue, + stream, + fetchSize, + 2, + metadata + ) + + spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + await rxResult + .records() + .pipe(take(1)) + .toPromise() + + const summary = await rxResult.consume().toPromise() + + const expectedSummary = new ResultSummary(query, parameters, metadata) + + expect(summary).toEqual(expectedSummary) + }) + + it('should get an error if the stream has failed', async () => { + const fetchSize = 3 + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + 'query', + undefined, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + const expectedError = newError('the error') + + stream.onError(expectedError) + + const resultRx = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + try { + await resultRx + .records() + .pipe(take(1)) + .toPromise() + expect('should not reach here').toBe('') + } catch (error) { + expect(error).toEqual(expectedError) + } + + try { + await resultRx + .consume() + .pipe(take(1)) + .toPromise() + expect('should not reach here').toBe('') + } catch (error) { + expect(error).toEqual(expectedError) + } + }) + }) + }) + + describe('.consume()', () => { + it('should be able to get the summary', async () => { + const fetchSize = 3 + const metadata = { + resultConsumedAfter: 20, + resultAvailableAfter: 124, + extraInfo: 'extra' + } + const query = 'query' + const parameters = { a: 1, b: 2 } + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + query, + parameters, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream( + queue, + stream, + fetchSize, + 2, + metadata + ) + + spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + const summary = await rxResult.consume().toPromise() + + const expectedSummary = new ResultSummary(query, parameters, metadata) + + expect(summary).toEqual(expectedSummary) + }) + + it('should cancel the observer for discarding elements', async () => { + const fetchSize = 3 + const metadata = { + resultConsumedAfter: 20, + resultAvailableAfter: 124, + extraInfo: 'extra' + } + const query = 'query' + const parameters = { a: 1, b: 2 } + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + query, + parameters, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue = [rawRecord3, rawRecord4, rawRecord5, rawRecord6] + + const simuatedStream = simulateStream( + queue, + stream, + fetchSize, + 2, + metadata + ) + + spyOn(stream, 'resume').and.callFake( + simuatedStream.resume.bind(simuatedStream) + ) + + spyOn(stream, 'pause').and.callFake( + simuatedStream.pause.bind(simuatedStream) + ) + + const cancel = spyOn(stream, 'cancel').and.returnValue(undefined) + + stream.onKeys(keys) + stream.onNext(rawRecord1) + stream.onNext(rawRecord2) + + const rxResult = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + await rxResult.consume().toPromise() + + expect(cancel).toHaveBeenCalled() + }) + + it('should get an error if the stream has failed', async () => { + const fetchSize = 3 + const stream = new ResultStreamObserverMock() + const result = new Result( + Promise.resolve(stream), + 'query', + undefined, + undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + const expectedError = newError('the error') + + stream.onError(expectedError) + + const resultRx = new RxResult( + new Observable(observer => { + observer.next(result) + observer.complete({}) + }) + ) + + try { + await resultRx + .consume() + .pipe(take(1)) + .toPromise() + expect('should not reach here').toBe('') + } catch (error) { + expect(error).toEqual(expectedError) + } + }) + }) +}) + +class ResultStreamObserverMock { + constructor () { + this._queuedRecords = [] + this._observers = [] + } + + cancel () {} + + prepareToHandleSingleResponse () {} + + markCompleted () {} + + subscribe (observer) { + this._observers.push(observer) + + if (observer.onError && this._error) { + observer.onError(this._error) + return + } + + if (observer.onKeys && this._fieldKeys) { + observer.onKeys(this._fieldKeys) + } + + if (observer.onNext) { + this._queuedRecords.forEach(record => observer.onNext(record)) + } + + if (observer.onCompleted && this._meta) { + observer.onCompleted(this._meta) + } + } + + onKeys (keys) { + this._fieldKeys = keys + this._observers.forEach(o => { + if (o.onKeys) { + o.onKeys(keys) + } + }) + } + + onNext (rawRecord) { + const record = new Record(this._fieldKeys, rawRecord) + const streamed = this._observers + .filter(o => o.onNext) + .map(o => o.onNext(record)) + .reduce(() => true, false) + + if (!streamed) { + this._queuedRecords.push(record) + } + } + + onError (error) { + this._error = error + this._observers.filter(o => o.onError).forEach(o => o.onError(error)) + } + + onCompleted (meta) { + this._meta = meta + this._observers.filter(o => o.onCompleted).forEach(o => o.onCompleted(meta)) + } + + pause () { + // do nothing + } + + resume () { + // do nothing + } +} + +function simulateStream ( + records, + observer, + fetchSize, + timeout = 1, + metadata = {} +) { + const state = { + paused: false, + streaming: false, + finished: false, + consumed: 0 + } + + const streaming = () => { + if (state.streaming || state.finished) { + return + } + state.streaming = true + state.consumed = 0 + + const interval = setInterval(() => { + state.streaming = state.consumed < fetchSize + state.finished = records.length === 0 + + if (state.finished) { + observer.onCompleted(metadata) + clearInterval(interval) + return + } + + if (!state.streaming) { + clearInterval(interval) + if (!state.paused) { + streaming() + } + return + } + + const record = records.shift() + if (record !== undefined) { + observer.onNext(record) + } + state.consumed++ + }, timeout) + } + + return { + pause: () => { + state.paused = true + }, + resume: () => { + state.paused = false + streaming() + } + } +} + +function waitFor (ms) { + return new Promise(resolve => setTimeout(resolve, ms)) +} diff --git a/packages/neo4j-driver/test/types/result-rx.test.ts b/packages/neo4j-driver/test/types/result-rx.test.ts index d93b3c68b..701df4654 100644 --- a/packages/neo4j-driver/test/types/result-rx.test.ts +++ b/packages/neo4j-driver/test/types/result-rx.test.ts @@ -23,6 +23,10 @@ const dummy: any = null const res: RxResult = dummy +const pushed: Promise = res.push() +const paused: void = res.pause() +const resumed: Promise = res.resume() + res.keys().subscribe({ next: value => console.log(`keys: ${value}`), complete: () => console.log('keys complete'), diff --git a/packages/neo4j-driver/types/result-rx.d.ts b/packages/neo4j-driver/types/result-rx.d.ts index 117b71377..0d7136326 100644 --- a/packages/neo4j-driver/types/result-rx.d.ts +++ b/packages/neo4j-driver/types/result-rx.d.ts @@ -25,6 +25,12 @@ declare interface RxResult { records(): Observable consume(): Observable + + pause(): void + + resume(): Promise + + push(): Promise } export default RxResult diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index ca224c8f7..09a41b3ee 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -32,6 +32,8 @@ const skippedTests = [ skip( 'Flaky in TeamCity', ifEndsWith('test_should_fail_when_writing_to_unexpectedly_interrupting_writers_on_run_using_tx_function'), + ifEndsWith('test_should_read_successfully_from_reachable_db_after_trying_unreachable_db'), + ifEndsWith('test_should_fail_when_reading_from_unexpectedly_interrupting_readers_using_tx_function') ), skip( 'ResultSummary.notifications defaults to empty array instead of return null/undefined',