From 5a8f2fe0be3f3838d9d1d5c61372638bba5f3588 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 25 Jan 2022 17:40:26 +0100 Subject: [PATCH 1/3] Introduce `.peek()` to the Async Iterator API This feature allows the user peek the element on the head of the cursor without moving the cursor forward. The implementation of this feature has as side effect a new implementation `.return` method in the iterator. This new version of the method does most the same thing as the previous one, but it also cancels the stream since this method moves the cursor to the end. The `next` method also get tweaked. --- packages/core/src/result.ts | 102 ++++-- packages/core/src/types.ts | 14 + packages/core/test/result.test.ts | 329 +++++++++++++++++- .../testkit-backend/src/request-handlers.js | 22 ++ 4 files changed, 431 insertions(+), 36 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 01bdba701..7f4b29f1d 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -19,7 +19,7 @@ import ResultSummary from './result-summary' import Record from './record' -import { Query } from './types' +import { Query, PeekableAsyncIterator } from './types' import { observer, util, connectionHolder } from './internal' const { EMPTY_CONNECTION_HOLDER } = connectionHolder @@ -106,6 +106,7 @@ interface ResultObserver { */ interface QueuedResultObserver extends ResultObserver { dequeue (): Promise + head (): Promise get size (): number } @@ -239,35 +240,68 @@ class Result implements Promise { * *Should not be combined with {@link Result#subscribe} or ${@link Result#then} functions.* * * @public - * @returns {AsyncIterator} The async iterator for the Results + * @returns {PeekableAsyncIterator} The async iterator for the Results */ - async* [Symbol.asyncIterator](): AsyncIterator { - const queuedObserver = this._createQueuedResultObserver() - - const status = { paused: true, firstRun: true } - - const streaming: observer.ResultStreamObserver | null = - // the error will be send to the onError callback - await this._subscribe(queuedObserver, true).catch(() => null) - - const controlFlow = () => { - if (queuedObserver.size >= this._watermarks.high && !status.paused) { - status.paused = true - streaming?.pause() - } else if (queuedObserver.size <= this._watermarks.low && status.paused || status.firstRun) { - status.firstRun = false - status.paused = false - streaming?.resume() + [Symbol.asyncIterator](): PeekableAsyncIterator { + const state: { + paused: boolean, + firstRun: boolean, + finished: boolean, + queuedObserver?: QueuedResultObserver, + streaming?: observer.ResultStreamObserver, + summary?: ResultSummary, + } = { paused: true, firstRun: true, finished: false } + + + const controlFlow = async () => { + if (state.queuedObserver === undefined) { + state.queuedObserver = this._createQueuedResultObserver() + state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined) + } + if (state.queuedObserver.size >= this._watermarks.high && !state.paused) { + state.paused = true + state.streaming?.pause() + } else if (state.queuedObserver.size <= this._watermarks.low && state.paused || state.firstRun) { + state.firstRun = false + state.paused = false + state.streaming?.resume() } } - while(true) { - controlFlow() - const next = await queuedObserver.dequeue() - if (next.done) { - return next.summary + const toIterableResult = (element: QueuedResultElement): IteratorResult => { + if (element.done) { + return { done: true, value: element.summary } + } + return { done: false, value: element.record } + } + + return { + next: async () => { + if (state.finished) { + return { done: true, value: state.summary!! } + } + await controlFlow() + const next = await state.queuedObserver!!.dequeue() + if (next.done) { + state.finished = next.done + state.summary = next.summary + } + return toIterableResult(next) + }, + return: async (value: ResultSummary) => { + state.finished = true + state.summary = value + state.streaming?.cancel() + return { done: true, value: value } + }, + peek: async () => { + if (state.finished) { + return { done: true, value: state.summary!! } + } + await controlFlow() + const head = await state.queuedObserver!!.head() + return toIterableResult(head) } - yield next.record } } @@ -506,6 +540,24 @@ class Result implements Promise { promiseHolder.resolvable = createResolvablePromise() return await promiseHolder.resolvable.promise }, + head: async () => { + if (buffer.length > 0) { + const element = buffer[0] + if (isError(element)) { + throw element + } + return element + } + promiseHolder.resolvable = createResolvablePromise() + try { + const element = await promiseHolder.resolvable.promise + buffer.unshift(element) + return element + } catch (error) { + buffer.unshift(error) + throw error + } + }, get size (): number { return buffer.length } diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index c8bdc659e..54889ade7 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -65,3 +65,17 @@ export interface Config { resolver?: (address: string) => string[] | Promise userAgent?: string } + +/** + * Extension interface for {@link AsyncIterator} with peek capabilities. + * + * @public + */ +export interface PeekableAsyncIterator extends AsyncIterator { + /** + * Returns the next element in the iteration without advancing the iterator. + * + * @return {IteratorResult> +} diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 85de34bb2..909d00f50 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -19,7 +19,6 @@ import { observer, connectionHolder } from '../src/internal' import { Connection, - internal, newError, Record, ResultObserver, @@ -409,7 +408,7 @@ describe('Result', () => { it('should call finally on error', done => { streamObserverMock.onError(expectedError) - result.catch(() => {}).finally(done) + result.catch(() => { }).finally(done) }) describe.each([ @@ -702,12 +701,53 @@ describe('Result', () => { ]) }) + it('should return summary when it finishes', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + await it.next() + await it.next() + const { value, done } = await it.next() + + expect(value).toEqual(new ResultSummary('query', {}, {})) + expect(done).toEqual(true) + }) + + it('should return summary value when it gets called second time after finish', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + await it.next() + await it.next() + await it.next() + const { value, done } = await it.next() + + expect(value).toEqual(new ResultSummary('query', {}, {})) + expect(done).toEqual(true) + }) + it('should end full batch', async () => { const fetchSize = 3 const observer = new ResultStreamObserverMock() const res = new Result( - Promise.resolve(observer), - 'query', undefined, undefined, + Promise.resolve(observer), + 'query', undefined, undefined, { low: fetchSize * 0.3, // Same as calculate in the session.ts high: fetchSize * 0.7 @@ -729,10 +769,10 @@ describe('Result', () => { ] const simuatedStream = simulateStream(queue, observer, fetchSize, 2) - + jest.spyOn(observer, 'resume') .mockImplementation(simuatedStream.resume.bind(simuatedStream)) - + jest.spyOn(observer, 'pause') .mockImplementation(simuatedStream.pause.bind(simuatedStream)) @@ -741,7 +781,7 @@ describe('Result', () => { observer.onNext(rawRecord2) const records = [] - + for await (const record of res) { records.push(record) await new Promise(r => setTimeout(r, 0.1)) @@ -757,6 +797,256 @@ describe('Result', () => { ]) }) + describe('.return()', () => { + it('should finished the operator when it get called', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const summary = new ResultSummary('query', {}, {}) + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + const it = result[Symbol.asyncIterator]() + await it.next() + const { value, done } = await it.return!!(summary) + + expect(value).toEqual(summary) + expect(done).toEqual(true) + }) + + it('should return value in the next call', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + const it = result[Symbol.asyncIterator]() + + await it.next() + await it.return!!(new ResultSummary('query', {}, {})) + + const { value, done } = await it.next() + + expect(value).toEqual(new ResultSummary('query', {}, {})) + expect(done).toEqual(true) + }) + + it('should not subscribe to the observer when it is the first api called', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + + const it = result[Symbol.asyncIterator]() + + await it.return!!(new ResultSummary('query', {}, {})) + + await it.next() + + expect(subscribe).not.toBeCalled() + }) + + it('should not canceld stream when it is the first api called', async () => { + const cancel = jest.spyOn(streamObserverMock, 'cancel') + + const it = result[Symbol.asyncIterator]() + + await it.return!!(new ResultSummary('query', {}, {})) + + await it.next() + + expect(cancel).not.toBeCalled() + }) + + it('should not cancel stream when the stream is already initialized ', async () => { + const cancel = jest.spyOn(streamObserverMock, 'cancel') + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + + const it = result[Symbol.asyncIterator]() + + await it.next() + await it.return!!(new ResultSummary('query', {}, {})) + + + expect(cancel).toBeCalled() + }) + + it('should prevent following next requests to subscribe to the stream', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + + const it = result[Symbol.asyncIterator]() + + await it.return!!(new ResultSummary('query', {}, {})) + await it.next() + + expect(subscribe).not.toBeCalled() + }) + + it('should prevent following peek requests to subscribe to the stream', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + + const it = result[Symbol.asyncIterator]() + + await it.return!!(new ResultSummary('query', {}, {})) + await it.peek() + + expect(subscribe).not.toBeCalled() + }) + }) + + describe('.peek()', () => { + it('should pause the stream and then resume the stream', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + await it.peek() + + expect(pause).toHaveBeenCalledTimes(1) + expect(resume).toHaveBeenCalledTimes(1) + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(resume.mock.invocationCallOrder[0]) + }) + + it('should pause the stream before subscribe', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + const pause = jest.spyOn(streamObserverMock, 'pause') + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + await it.peek() + + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(subscribe.mock.invocationCallOrder[0]) + }) + + it('should pause the stream if queue is bigger than high watermark', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i <= watermarks.high; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + await it.peek() + + expect(pause).toBeCalledTimes(1) + }) + + it('should call resume if queue is smaller than low watermark', async () => { + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.low - 1; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + await it.peek() + + expect(resume).toBeCalledTimes(1) + }) + + + it('should return the first record', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + const { value: record } = await it.peek() + + expect(record).toEqual(new Record(keys, rawRecord1)) + }) + + it('should not move the cursor ', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const it = result[Symbol.asyncIterator]() + const { value: record } = await it.peek() + const { value: nextRecord } = await it.next() + + expect(record).toEqual(new Record(keys, rawRecord1)) + expect(record).toEqual(nextRecord) + }) + + it('should not move the cursor when buffer is empty ', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + + setTimeout(() => { + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + }, 100) + + const it = result[Symbol.asyncIterator]() + const { value: record } = await it.peek() + const { value: nextRecord } = await it.next() + + expect(record).toEqual(new Record(keys, rawRecord1)) + expect(record).toEqual(nextRecord) + }) + + it('should not move the cursor when buffer is empty next element is error ', async () => { + const keys = ['a', 'b'] + + streamObserverMock.onKeys(keys) + + setTimeout(() => { + streamObserverMock.onError(expectedError) + }, 100) + + const it = result[Symbol.asyncIterator]() + let peekError: Error | null = null + let nextError: Error | null = null + + try { + await it.peek() + } catch (e) { + peekError = e + } + + try { + await it.next() + } catch (e) { + nextError = e + } + + expect(peekError).toEqual(expectedError) + expect(peekError).toEqual(nextError) + }) + }) + describe('onError', () => { it('should throws an exception while iterate over records', async () => { const keys = ['a', 'b'] @@ -955,6 +1245,16 @@ describe('Result', () => { expect(next.done).toBe(true) expect(next.value).toStrictEqual(expectedResultSummary) }) + + describe('.peek()', () => { + it('should be resolved with expected result summary', async () => { + const it = result[Symbol.asyncIterator]() + const next = await it.peek() + + expect(next.done).toBe(true) + expect(next.value).toStrictEqual(expectedResultSummary) + }) + }) }) }) @@ -1009,6 +1309,13 @@ describe('Result', () => { // do nothing } }) + + describe('.peek()', () => { + shouldReturnRejectedPromiseWithTheExpectedError(async () => { + const it = result[Symbol.asyncIterator]() + await it.peek() + }) + }) }) function shouldReturnRejectedPromiseWithTheExpectedError( @@ -1122,7 +1429,7 @@ function simulateStream( } state.streaming = true state.consumed = 0 - + const interval = setInterval(() => { state.streaming = state.consumed < fetchSize state.finished = records.length === 0 @@ -1139,14 +1446,14 @@ function simulateStream( streaming() } return - } + } const record = records.shift() if (record !== undefined) { observer.onNext(record) } state.consumed++ - + }, timeout) } @@ -1159,7 +1466,7 @@ function simulateStream( streaming() } } - + /* return () => { diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 4686d67c5..ab7b21caf 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -182,6 +182,27 @@ export function ResultNext (context, data, wire) { }); } +export function ResultPeek (context, data, wire) { + const { resultId } = data + const result = context.getResult(resultId) + if (!("recordIt" in result)) { + result.recordIt = result[Symbol.asyncIterator]() + } + return result.recordIt.peek().then(({ value, done }) => { + if (done) { + wire.writeResponse('NullRecord', null) + } else { + const values = Array.from(value.values()).map(nativeToCypher) + wire.writeResponse('Record', { + values: values + }) + } + }).catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }); +} + export function ResultConsume (context, data, wire) { const { resultId } = data const result = context.getResult(resultId) @@ -337,6 +358,7 @@ export function GetFeatures (_context, _params, wire) { 'Feature:Bolt:4.3', 'Feature:Bolt:4.4', 'Feature:API:Result.List', + 'Feature:API:Result.Peek', 'Temporary:ConnectionAcquisitionTimeout', ...SUPPORTED_TLS ] From b56de392cedf07eb098c51b3e456f6fea4d7a119 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 25 Jan 2022 19:44:29 +0100 Subject: [PATCH 2/3] Improving code design --- packages/core/src/result.ts | 42 +++++++++++-------------------------- 1 file changed, 12 insertions(+), 30 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 7f4b29f1d..978077831 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -87,17 +87,6 @@ interface ResultObserver { onError?: (error: Error) => void } -/** - * Defines the elements in the queue result observer - * @access private - */ - type QueuedResultElement = { - done: false - record: Record -} | { - done: true - summary: ResultSummary -} /** * Defines a ResultObserver interface which can be used to enqueue records and dequeue @@ -105,8 +94,8 @@ interface ResultObserver { * @access private */ interface QueuedResultObserver extends ResultObserver { - dequeue (): Promise - head (): Promise + dequeue (): Promise> + head (): Promise> get size (): number } @@ -268,13 +257,6 @@ class Result implements Promise { } } - const toIterableResult = (element: QueuedResultElement): IteratorResult => { - if (element.done) { - return { done: true, value: element.summary } - } - return { done: false, value: element.record } - } - return { next: async () => { if (state.finished) { @@ -284,9 +266,9 @@ class Result implements Promise { const next = await state.queuedObserver!!.dequeue() if (next.done) { state.finished = next.done - state.summary = next.summary + state.summary = next.value } - return toIterableResult(next) + return next }, return: async (value: ResultSummary) => { state.finished = true @@ -299,8 +281,7 @@ class Result implements Promise { return { done: true, value: state.summary!! } } await controlFlow() - const head = await state.queuedObserver!!.head() - return toIterableResult(head) + return await state.queuedObserver!!.head() } } } @@ -487,7 +468,7 @@ class Result implements Promise { reject: (arg: Error) => any | undefined } - function createResolvablePromise (): ResolvablePromise { + function createResolvablePromise (): ResolvablePromise> { const resolvablePromise: any = {} resolvablePromise.promise = new Promise((resolve, reject) => { resolvablePromise.resolve = resolve @@ -496,22 +477,23 @@ class Result implements Promise { return resolvablePromise; } - type QueuedResultElementOrError = QueuedResultElement | Error + type QueuedResultElementOrError = IteratorResult | Error function isError(elementOrError: QueuedResultElementOrError): elementOrError is Error { return elementOrError instanceof Error } const buffer: QueuedResultElementOrError[] = [] - const promiseHolder: { resolvable: ResolvablePromise | null } = { resolvable: null } - + const promiseHolder: { + resolvable: ResolvablePromise> | null + } = { resolvable: null } const observer = { onNext: (record: Record) => { - observer._push({ done: false, record }) + observer._push({ done: false, value: record }) }, onCompleted: (summary: ResultSummary) => { - observer._push({ done: true, summary }) + observer._push({ done: true, value: summary }) }, onError: (error: Error) => { observer._push(error) From b71dd85a60fb845081fa4d766b79e89880b885da Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 26 Jan 2022 11:12:21 +0100 Subject: [PATCH 3/3] Removing happiness --- packages/core/src/result.ts | 8 ++++---- packages/core/test/result.test.ts | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 978077831..bc011e701 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -260,10 +260,10 @@ class Result implements Promise { return { next: async () => { if (state.finished) { - return { done: true, value: state.summary!! } + return { done: true, value: state.summary! } } await controlFlow() - const next = await state.queuedObserver!!.dequeue() + const next = await state.queuedObserver!.dequeue() if (next.done) { state.finished = next.done state.summary = next.value @@ -278,10 +278,10 @@ class Result implements Promise { }, peek: async () => { if (state.finished) { - return { done: true, value: state.summary!! } + return { done: true, value: state.summary! } } await controlFlow() - return await state.queuedObserver!!.head() + return await state.queuedObserver!.head() } } } diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 909d00f50..db60b50d0 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -810,7 +810,7 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() await it.next() - const { value, done } = await it.return!!(summary) + const { value, done } = await it.return!(summary) expect(value).toEqual(summary) expect(done).toEqual(true) @@ -828,7 +828,7 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() await it.next() - await it.return!!(new ResultSummary('query', {}, {})) + await it.return!(new ResultSummary('query', {}, {})) const { value, done } = await it.next() @@ -841,7 +841,7 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() - await it.return!!(new ResultSummary('query', {}, {})) + await it.return!(new ResultSummary('query', {}, {})) await it.next() @@ -853,7 +853,7 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() - await it.return!!(new ResultSummary('query', {}, {})) + await it.return!(new ResultSummary('query', {}, {})) await it.next() @@ -874,7 +874,7 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() await it.next() - await it.return!!(new ResultSummary('query', {}, {})) + await it.return!(new ResultSummary('query', {}, {})) expect(cancel).toBeCalled() @@ -885,7 +885,7 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() - await it.return!!(new ResultSummary('query', {}, {})) + await it.return!(new ResultSummary('query', {}, {})) await it.next() expect(subscribe).not.toBeCalled() @@ -896,7 +896,7 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() - await it.return!!(new ResultSummary('query', {}, {})) + await it.return!(new ResultSummary('query', {}, {})) await it.peek() expect(subscribe).not.toBeCalled()