diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 01bdba701..bc011e701 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -19,7 +19,7 @@ import ResultSummary from './result-summary' import Record from './record' -import { Query } from './types' +import { Query, PeekableAsyncIterator } from './types' import { observer, util, connectionHolder } from './internal' const { EMPTY_CONNECTION_HOLDER } = connectionHolder @@ -87,17 +87,6 @@ interface ResultObserver { onError?: (error: Error) => void } -/** - * Defines the elements in the queue result observer - * @access private - */ - type QueuedResultElement = { - done: false - record: Record -} | { - done: true - summary: ResultSummary -} /** * Defines a ResultObserver interface which can be used to enqueue records and dequeue @@ -105,7 +94,8 @@ interface ResultObserver { * @access private */ interface QueuedResultObserver extends ResultObserver { - dequeue (): Promise + dequeue (): Promise> + head (): Promise> get size (): number } @@ -239,35 +229,60 @@ class Result implements Promise { * *Should not be combined with {@link Result#subscribe} or ${@link Result#then} functions.* * * @public - * @returns {AsyncIterator} The async iterator for the Results + * @returns {PeekableAsyncIterator} The async iterator for the Results */ - async* [Symbol.asyncIterator](): AsyncIterator { - const queuedObserver = this._createQueuedResultObserver() - - const status = { paused: true, firstRun: true } - - const streaming: observer.ResultStreamObserver | null = - // the error will be send to the onError callback - await this._subscribe(queuedObserver, true).catch(() => null) - - const controlFlow = () => { - if (queuedObserver.size >= this._watermarks.high && !status.paused) { - status.paused = true - streaming?.pause() - } else if (queuedObserver.size <= this._watermarks.low && status.paused || status.firstRun) { - status.firstRun = false - status.paused = false - streaming?.resume() + [Symbol.asyncIterator](): PeekableAsyncIterator { + const state: { + paused: boolean, + firstRun: boolean, + finished: boolean, + queuedObserver?: QueuedResultObserver, + streaming?: observer.ResultStreamObserver, + summary?: ResultSummary, + } = { paused: true, firstRun: true, finished: false } + + + const controlFlow = async () => { + if (state.queuedObserver === undefined) { + state.queuedObserver = this._createQueuedResultObserver() + state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined) + } + if (state.queuedObserver.size >= this._watermarks.high && !state.paused) { + state.paused = true + state.streaming?.pause() + } else if (state.queuedObserver.size <= this._watermarks.low && state.paused || state.firstRun) { + state.firstRun = false + state.paused = false + state.streaming?.resume() } } - while(true) { - controlFlow() - const next = await queuedObserver.dequeue() - if (next.done) { - return next.summary + return { + next: async () => { + if (state.finished) { + return { done: true, value: state.summary! } + } + await controlFlow() + const next = await state.queuedObserver!.dequeue() + if (next.done) { + state.finished = next.done + state.summary = next.value + } + return next + }, + return: async (value: ResultSummary) => { + state.finished = true + state.summary = value + state.streaming?.cancel() + return { done: true, value: value } + }, + peek: async () => { + if (state.finished) { + return { done: true, value: state.summary! } + } + await controlFlow() + return await state.queuedObserver!.head() } - yield next.record } } @@ -453,7 +468,7 @@ class Result implements Promise { reject: (arg: Error) => any | undefined } - function createResolvablePromise (): ResolvablePromise { + function createResolvablePromise (): ResolvablePromise> { const resolvablePromise: any = {} resolvablePromise.promise = new Promise((resolve, reject) => { resolvablePromise.resolve = resolve @@ -462,22 +477,23 @@ class Result implements Promise { return resolvablePromise; } - type QueuedResultElementOrError = QueuedResultElement | Error + type QueuedResultElementOrError = IteratorResult | Error function isError(elementOrError: QueuedResultElementOrError): elementOrError is Error { return elementOrError instanceof Error } const buffer: QueuedResultElementOrError[] = [] - const promiseHolder: { resolvable: ResolvablePromise | null } = { resolvable: null } - + const promiseHolder: { + resolvable: ResolvablePromise> | null + } = { resolvable: null } const observer = { onNext: (record: Record) => { - observer._push({ done: false, record }) + observer._push({ done: false, value: record }) }, onCompleted: (summary: ResultSummary) => { - observer._push({ done: true, summary }) + observer._push({ done: true, value: summary }) }, onError: (error: Error) => { observer._push(error) @@ -506,6 +522,24 @@ class Result implements Promise { promiseHolder.resolvable = createResolvablePromise() return await promiseHolder.resolvable.promise }, + head: async () => { + if (buffer.length > 0) { + const element = buffer[0] + if (isError(element)) { + throw element + } + return element + } + promiseHolder.resolvable = createResolvablePromise() + try { + const element = await promiseHolder.resolvable.promise + buffer.unshift(element) + return element + } catch (error) { + buffer.unshift(error) + throw error + } + }, get size (): number { return buffer.length } diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index c8bdc659e..54889ade7 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -65,3 +65,17 @@ export interface Config { resolver?: (address: string) => string[] | Promise userAgent?: string } + +/** + * Extension interface for {@link AsyncIterator} with peek capabilities. + * + * @public + */ +export interface PeekableAsyncIterator extends AsyncIterator { + /** + * Returns the next element in the iteration without advancing the iterator. + * + * @return {IteratorResult> +} diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 85de34bb2..db60b50d0 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -19,7 +19,6 @@ import { observer, connectionHolder } from '../src/internal' import { Connection, - internal, newError, Record, ResultObserver, @@ -409,7 +408,7 @@ describe('Result', () => { it('should call finally on error', done => { streamObserverMock.onError(expectedError) - result.catch(() => {}).finally(done) + result.catch(() => { }).finally(done) }) describe.each([ @@ -702,12 +701,53 @@ describe('Result', () => { ]) }) + it('should return summary when it finishes', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + await it.next() + await it.next() + const { value, done } = await it.next() + + expect(value).toEqual(new ResultSummary('query', {}, {})) + expect(done).toEqual(true) + }) + + it('should return summary value when it gets called second time after finish', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + await it.next() + await it.next() + await it.next() + const { value, done } = await it.next() + + expect(value).toEqual(new ResultSummary('query', {}, {})) + expect(done).toEqual(true) + }) + it('should end full batch', async () => { const fetchSize = 3 const observer = new ResultStreamObserverMock() const res = new Result( - Promise.resolve(observer), - 'query', undefined, undefined, + Promise.resolve(observer), + 'query', undefined, undefined, { low: fetchSize * 0.3, // Same as calculate in the session.ts high: fetchSize * 0.7 @@ -729,10 +769,10 @@ describe('Result', () => { ] const simuatedStream = simulateStream(queue, observer, fetchSize, 2) - + jest.spyOn(observer, 'resume') .mockImplementation(simuatedStream.resume.bind(simuatedStream)) - + jest.spyOn(observer, 'pause') .mockImplementation(simuatedStream.pause.bind(simuatedStream)) @@ -741,7 +781,7 @@ describe('Result', () => { observer.onNext(rawRecord2) const records = [] - + for await (const record of res) { records.push(record) await new Promise(r => setTimeout(r, 0.1)) @@ -757,6 +797,256 @@ describe('Result', () => { ]) }) + describe('.return()', () => { + it('should finished the operator when it get called', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const summary = new ResultSummary('query', {}, {}) + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + const it = result[Symbol.asyncIterator]() + await it.next() + const { value, done } = await it.return!(summary) + + expect(value).toEqual(summary) + expect(done).toEqual(true) + }) + + it('should return value in the next call', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + const it = result[Symbol.asyncIterator]() + + await it.next() + await it.return!(new ResultSummary('query', {}, {})) + + const { value, done } = await it.next() + + expect(value).toEqual(new ResultSummary('query', {}, {})) + expect(done).toEqual(true) + }) + + it('should not subscribe to the observer when it is the first api called', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + + const it = result[Symbol.asyncIterator]() + + await it.return!(new ResultSummary('query', {}, {})) + + await it.next() + + expect(subscribe).not.toBeCalled() + }) + + it('should not canceld stream when it is the first api called', async () => { + const cancel = jest.spyOn(streamObserverMock, 'cancel') + + const it = result[Symbol.asyncIterator]() + + await it.return!(new ResultSummary('query', {}, {})) + + await it.next() + + expect(cancel).not.toBeCalled() + }) + + it('should not cancel stream when the stream is already initialized ', async () => { + const cancel = jest.spyOn(streamObserverMock, 'cancel') + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + + const it = result[Symbol.asyncIterator]() + + await it.next() + await it.return!(new ResultSummary('query', {}, {})) + + + expect(cancel).toBeCalled() + }) + + it('should prevent following next requests to subscribe to the stream', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + + const it = result[Symbol.asyncIterator]() + + await it.return!(new ResultSummary('query', {}, {})) + await it.next() + + expect(subscribe).not.toBeCalled() + }) + + it('should prevent following peek requests to subscribe to the stream', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + + const it = result[Symbol.asyncIterator]() + + await it.return!(new ResultSummary('query', {}, {})) + await it.peek() + + expect(subscribe).not.toBeCalled() + }) + }) + + describe('.peek()', () => { + it('should pause the stream and then resume the stream', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + await it.peek() + + expect(pause).toHaveBeenCalledTimes(1) + expect(resume).toHaveBeenCalledTimes(1) + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(resume.mock.invocationCallOrder[0]) + }) + + it('should pause the stream before subscribe', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + const pause = jest.spyOn(streamObserverMock, 'pause') + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + await it.peek() + + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(subscribe.mock.invocationCallOrder[0]) + }) + + it('should pause the stream if queue is bigger than high watermark', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i <= watermarks.high; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + await it.peek() + + expect(pause).toBeCalledTimes(1) + }) + + it('should call resume if queue is smaller than low watermark', async () => { + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.low - 1; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + await it.peek() + + expect(resume).toBeCalledTimes(1) + }) + + + it('should return the first record', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + const { value: record } = await it.peek() + + expect(record).toEqual(new Record(keys, rawRecord1)) + }) + + it('should not move the cursor ', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + const { value: record } = await it.peek() + const { value: nextRecord } = await it.next() + + expect(record).toEqual(new Record(keys, rawRecord1)) + expect(record).toEqual(nextRecord) + }) + + it('should not move the cursor when buffer is empty ', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + + setTimeout(() => { + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + }, 100) + + const it = result[Symbol.asyncIterator]() + const { value: record } = await it.peek() + const { value: nextRecord } = await it.next() + + expect(record).toEqual(new Record(keys, rawRecord1)) + expect(record).toEqual(nextRecord) + }) + + it('should not move the cursor when buffer is empty next element is error ', async () => { + const keys = ['a', 'b'] + + streamObserverMock.onKeys(keys) + + setTimeout(() => { + streamObserverMock.onError(expectedError) + }, 100) + + const it = result[Symbol.asyncIterator]() + let peekError: Error | null = null + let nextError: Error | null = null + + try { + await it.peek() + } catch (e) { + peekError = e + } + + try { + await it.next() + } catch (e) { + nextError = e + } + + expect(peekError).toEqual(expectedError) + expect(peekError).toEqual(nextError) + }) + }) + describe('onError', () => { it('should throws an exception while iterate over records', async () => { const keys = ['a', 'b'] @@ -955,6 +1245,16 @@ describe('Result', () => { expect(next.done).toBe(true) expect(next.value).toStrictEqual(expectedResultSummary) }) + + describe('.peek()', () => { + it('should be resolved with expected result summary', async () => { + const it = result[Symbol.asyncIterator]() + const next = await it.peek() + + expect(next.done).toBe(true) + expect(next.value).toStrictEqual(expectedResultSummary) + }) + }) }) }) @@ -1009,6 +1309,13 @@ describe('Result', () => { // do nothing } }) + + describe('.peek()', () => { + shouldReturnRejectedPromiseWithTheExpectedError(async () => { + const it = result[Symbol.asyncIterator]() + await it.peek() + }) + }) }) function shouldReturnRejectedPromiseWithTheExpectedError( @@ -1122,7 +1429,7 @@ function simulateStream( } state.streaming = true state.consumed = 0 - + const interval = setInterval(() => { state.streaming = state.consumed < fetchSize state.finished = records.length === 0 @@ -1139,14 +1446,14 @@ function simulateStream( streaming() } return - } + } const record = records.shift() if (record !== undefined) { observer.onNext(record) } state.consumed++ - + }, timeout) } @@ -1159,7 +1466,7 @@ function simulateStream( streaming() } } - + /* return () => { diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 4686d67c5..ab7b21caf 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -182,6 +182,27 @@ export function ResultNext (context, data, wire) { }); } +export function ResultPeek (context, data, wire) { + const { resultId } = data + const result = context.getResult(resultId) + if (!("recordIt" in result)) { + result.recordIt = result[Symbol.asyncIterator]() + } + return result.recordIt.peek().then(({ value, done }) => { + if (done) { + wire.writeResponse('NullRecord', null) + } else { + const values = Array.from(value.values()).map(nativeToCypher) + wire.writeResponse('Record', { + values: values + }) + } + }).catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }); +} + export function ResultConsume (context, data, wire) { const { resultId } = data const result = context.getResult(resultId) @@ -337,6 +358,7 @@ export function GetFeatures (_context, _params, wire) { 'Feature:Bolt:4.3', 'Feature:Bolt:4.4', 'Feature:API:Result.List', + 'Feature:API:Result.Peek', 'Temporary:ConnectionAcquisitionTimeout', ...SUPPORTED_TLS ]