From e1098f4a189d706c8fc1037de7d63db9349551d5 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 4 Aug 2021 17:46:55 +0200 Subject: [PATCH 01/26] AsynIterator API for Result consumption --- packages/core/src/result.ts | 40 ++++++++ packages/testkit-backend/src/context.js | 13 +++ .../testkit-backend/src/request-handlers.js | 93 ++++++++++++++----- .../src/skipped-tests/common.js | 5 + 4 files changed, 129 insertions(+), 22 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 2153937c6..37ad11a3c 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -192,6 +192,46 @@ class Result implements Promise { return this._p } + async* ayncInterator(): any { + function createResolvablePromise (): any { + const resolvablePromise: any = {} + resolvablePromise.promise = new Promise((resolve, reject) => { + resolvablePromise.resolve = resolve + resolvablePromise.reject = reject + }); + return resolvablePromise; + } + + const observer = { + _buffer: [createResolvablePromise()], + onNext: (record: Record) => { + observer._buffer[observer._buffer.length - 1].resolve({ record, done: false }); + observer._buffer.push(createResolvablePromise()); + }, + onCompleted: (summary: ResultSummary) => { + observer._buffer[observer._buffer.length - 1].resolve({ summary, done: true }); + }, + onError: (error: Error) => { + observer._buffer[observer._buffer.length - 1].reject(error); + }, + consume: async () => { + const value = await observer._buffer[0].promise + observer._buffer.shift(); + return value + } + } + + this.subscribe(observer) + + while(true) { + const value = await observer.consume() + if (value.done) { + return value.summary; + } + yield value.record + } + } + /** * Waits for all results and calls the passed in function with the results. * diff --git a/packages/testkit-backend/src/context.js b/packages/testkit-backend/src/context.js index c13bff92e..fb1bf9087 100644 --- a/packages/testkit-backend/src/context.js +++ b/packages/testkit-backend/src/context.js @@ -8,6 +8,7 @@ export default class Context { this._resultObservers = {} this._errors = {} this._shouldRunTest = shouldRunTest + this._results = {} } addDriver (driver) { @@ -45,6 +46,18 @@ export default class Context { return id } + addResult (result) { + return this._add(this._results, result) + } + + removeResult (id) { + delete this._results[id] + } + + getResult (id) { + return this._results[id] + } + getDriver (id) { return this._drivers[id] } diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 3e0036291..5466bdf42 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -2,6 +2,7 @@ import neo4j from './neo4j' import ResultObserver from './result-observer.js' import { cypherToNative, nativeToCypher } from './cypher-native-binders.js' import tls from 'tls' +const USE_ASYNC = true const SUPPORTED_TLS = (() => { if (tls.DEFAULT_MAX_VERSION) { @@ -161,48 +162,89 @@ export function SessionRun (context, data, wire) { wire.writeError(e) return } - const resultObserver = new ResultObserver({ sessionId, result }) - const id = context.addResultObserver(resultObserver) + let id + if (USE_ASYNC) { + id = context.addResult(result) + } else { + const resultObserver = new ResultObserver({ sessionId, result }) + result.subscribe(resultObserver) + id = context.addResultObserver(resultObserver) + } wire.writeResponse('Result', { id }) }) } export function ResultNext (context, data, wire) { const { resultId } = data - const resultObserver = context.getResultObserver(resultId) - const nextPromise = resultObserver.next() - nextPromise - .then(rec => { - if (rec) { - const values = Array.from(rec.values()).map(nativeToCypher) + if (USE_ASYNC) { + const result = context.getResult(resultId) + if (!("recordIt" in result)) { + result.recordIt = result.ayncInterator() + } + result.recordIt.next().then(({ value, done }) => { + if (done) { + wire.writeResponse('NullRecord', null) + } else { + const values = Array.from(value.values()).map(nativeToCypher) wire.writeResponse('Record', { values: values }) - } else { - wire.writeResponse('NullRecord', null) } - }) - .catch(e => { + }).catch(e => { console.log('got some err: ' + JSON.stringify(e)) wire.writeError(e) - }) + }); + } else { + const resultObserver = context.getResultObserver(resultId) + const nextPromise = resultObserver.next() + nextPromise + .then(rec => { + if (rec) { + const values = Array.from(rec.values()).map(nativeToCypher) + wire.writeResponse('Record', { + values: values + }) + } else { + wire.writeResponse('NullRecord', null) + } + }) + .catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }) + } + } export function ResultConsume (context, data, wire) { const { resultId } = data - const resultObserver = context.getResultObserver(resultId) - resultObserver - .completitionPromise() - .then(summary => { + if (USE_ASYNC) { + const result = context.getResult(resultId) + result.summary().then(summary => { + console.log(summary); wire.writeResponse('Summary', { ...summary, serverInfo: { agent: summary.server.agent, - protocolVersion: summary.server.protocolVersion.toFixed(1) + protocolVersion: summary.server.protocolVersion? summary.server.protocolVersion.toFixed(1) : 0 } }) - }) - .catch(e => wire.writeError(e)) + }).catch(e => wire.writeError(e)) + } else { + const resultObserver = context.getResultObserver(resultId) + resultObserver + .completitionPromise() + .then(summary => { + wire.writeResponse('Summary', { + ...summary, + serverInfo: { + agent: summary.server.agent, + protocolVersion: summary.server.protocolVersion.toFixed(1) + } + }) + }) + .catch(e => wire.writeError(e)) + } } export function ResultList (context, data, wire) { @@ -245,8 +287,15 @@ export function TransactionRun (context, data, wire) { } } const result = tx.tx.run(cypher, params) - const resultObserver = new ResultObserver({ result }) - const id = context.addResultObserver(resultObserver) + + let id + if (USE_ASYNC) { + id = context.addResult(result) + } else { + const resultObserver = new ResultObserver({ result }) + result.subscribe(resultObserver) + id = context.addResultObserver(resultObserver) + } wire.writeResponse('Result', { id }) } diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 2e888e054..692008e79 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -88,6 +88,11 @@ const skippedTests = [ ifEquals( 'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list' ) + ), + skip( + 'Skipped during the AsyncIterator implementation', + ifEndsWith('.test_session_reuse'), + ifEndsWith('.test_iteration_nested') ) ] From 353675ce716e53083e37253a6b0f807d5e66d9a8 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 5 Aug 2021 11:59:26 +0200 Subject: [PATCH 02/26] Improving interface --- packages/core/src/result.ts | 20 +++++++++++++++---- packages/neo4j-driver/gulpfile.babel.js | 1 + .../testkit-backend/src/request-handlers.js | 2 +- 3 files changed, 18 insertions(+), 5 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 37ad11a3c..5d3a8b471 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -192,8 +192,20 @@ class Result implements Promise { return this._p } - async* ayncInterator(): any { - function createResolvablePromise (): any { + async* [Symbol.asyncIterator](): AsyncIterator { + interface ConsumedValue { + done: boolean + record?: Record + summary?: ResultSummary + } + + interface ResolvablePromise { + promise: Promise + resolve: (arg: T) => any | undefined + reject: (arg: Error) => any | undefined + } + + function createResolvablePromise (): ResolvablePromise { const resolvablePromise: any = {} resolvablePromise.promise = new Promise((resolve, reject) => { resolvablePromise.resolve = resolve @@ -226,9 +238,9 @@ class Result implements Promise { while(true) { const value = await observer.consume() if (value.done) { - return value.summary; + return value.summary! } - yield value.record + yield value.record! } } diff --git a/packages/neo4j-driver/gulpfile.babel.js b/packages/neo4j-driver/gulpfile.babel.js index b531c609b..f368a2f9e 100644 --- a/packages/neo4j-driver/gulpfile.babel.js +++ b/packages/neo4j-driver/gulpfile.babel.js @@ -203,6 +203,7 @@ gulp.task('run-ts-declaration-tests', function (done) { .src(['test/types/**/*', 'types/**/*'], { base: '.' }) .pipe( ts({ + lib: ['es6', 'dom', 'esnext.asynciterable'], module: 'es6', target: 'es6', noImplicitAny: true, diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 5466bdf42..f5b27ed32 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -179,7 +179,7 @@ export function ResultNext (context, data, wire) { if (USE_ASYNC) { const result = context.getResult(resultId) if (!("recordIt" in result)) { - result.recordIt = result.ayncInterator() + result.recordIt = result[Symbol.asyncIterator] } result.recordIt.next().then(({ value, done }) => { if (done) { From ed73ca5a103d924849befb03020077955f39d2eb Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 5 Aug 2021 12:51:56 +0200 Subject: [PATCH 03/26] Add usage example for AsyncIterators --- packages/neo4j-driver/test/examples.test.js | 18 ++++++++++++++++++ .../testkit-backend/src/request-handlers.js | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/packages/neo4j-driver/test/examples.test.js b/packages/neo4j-driver/test/examples.test.js index 5ed785bcb..f0c42dd94 100644 --- a/packages/neo4j-driver/test/examples.test.js +++ b/packages/neo4j-driver/test/examples.test.js @@ -1511,6 +1511,24 @@ describe('#integration examples', () => { }) } }) + + describe('async interators', () => { + it('should iterate over the values', async () => { + const driver = driverGlobal + + const session = driver.session() + try { + const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x') + const xs = [] + for await (const record of result) { + xs.push(record.get('x').toInt()) + } + expect(xs).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + } finally { + await session.close() + } + }) + }) }) function removeLineBreaks (string) { diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index f5b27ed32..02436f5e8 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -179,7 +179,7 @@ export function ResultNext (context, data, wire) { if (USE_ASYNC) { const result = context.getResult(resultId) if (!("recordIt" in result)) { - result.recordIt = result[Symbol.asyncIterator] + result.recordIt = result[Symbol.asyncIterator]() } result.recordIt.next().then(({ value, done }) => { if (done) { From ac9375def1e11b3d79f2981ff9cbb17da6003850 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 28 Dec 2021 13:33:47 +0100 Subject: [PATCH 04/26] Skipping more tests --- packages/testkit-backend/src/skipped-tests/common.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 692008e79..a130b34f9 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -1,4 +1,4 @@ -import skip, { ifEquals, ifEndsWith } from './skip' +import skip, { ifEquals, ifEndsWith, ifStartsWith } from './skip' const skippedTests = [ skip( @@ -92,7 +92,8 @@ const skippedTests = [ skip( 'Skipped during the AsyncIterator implementation', ifEndsWith('.test_session_reuse'), - ifEndsWith('.test_iteration_nested') + ifEndsWith('.test_iteration_nested'), + ifStartsWith('stub.iteration.test_result_list.TestResultList') ) ] From 18943bc291210e019e1cfcfe3d1cfb85741cae88 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 28 Dec 2021 16:10:04 +0100 Subject: [PATCH 05/26] Implementing pooling backpressure --- packages/bolt-connection/src/bolt/stream-observers.js | 9 +++++++-- packages/core/src/result.ts | 3 +++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index 8a2230b59..28a0d54f0 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -346,8 +346,13 @@ class ResultStreamObserver extends StreamObserver { this._discardFunction(this._queryId, this) this._setState(_states.STREAMING) } else if (this._autoPull) { - this._moreFunction(this._queryId, this._fetchSize, this) - this._setState(_states.STREAMING) + const queueSize = Math.max.apply(null, this._observers.map(o => o.queueSize || 0)) + if (queueSize < this._lowRecordWatermark) { + this._moreFunction(this._queryId, this._fetchSize, this) + this._setState(_states.STREAMING) + } else { + setTimeout(this._handleStreaming.bind(this), 1) + } } } } diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 5d3a8b471..92a428d88 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -230,6 +230,9 @@ class Result implements Promise { const value = await observer._buffer[0].promise observer._buffer.shift(); return value + }, + get queueSize (): number { + return observer._buffer.length - 1 } } From 3334077688f412a4d1a875242eb58e2d1ee58f1c Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 28 Dec 2021 18:50:08 +0100 Subject: [PATCH 06/26] Add backpressure mecanics --- .../src/bolt/stream-observers.js | 57 +++++++++++++------ packages/core/src/internal/observers.ts | 41 +++++++++++++ packages/core/src/result.ts | 28 +++++++-- packages/core/test/result.test.ts | 16 ++++++ 4 files changed, 121 insertions(+), 21 deletions(-) diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index 28a0d54f0..e9f77dca2 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -96,6 +96,26 @@ class ResultStreamObserver extends StreamObserver { this._fetchSize = fetchSize this._setState(reactive ? _states.READY : _states.READY_STREAMING) this._setupAuoPull(fetchSize) + this._pullMode = false; + } + + setPullMode(pullMode) { + this._pullMode = pullMode; + } + + pull() { + return this._state.pull(this) + } + + isReady() { + return this._state === _states.READY + } + + getWatermaks () { + return { + high: this._highRecordWatermark, + low: this._lowRecordWatermark + } } /** @@ -342,18 +362,18 @@ class ResultStreamObserver extends StreamObserver { _handleStreaming () { if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { - if (this._discard) { - this._discardFunction(this._queryId, this) - this._setState(_states.STREAMING) - } else if (this._autoPull) { - const queueSize = Math.max.apply(null, this._observers.map(o => o.queueSize || 0)) - if (queueSize < this._lowRecordWatermark) { - this._moreFunction(this._queryId, this._fetchSize, this) - this._setState(_states.STREAMING) - } else { - setTimeout(this._handleStreaming.bind(this), 1) - } + if (!this._pullMode && (this._discard || this._autoPull)) { + this._more() } + this._setState(_states.STREAMING) + } + } + + _more () { + if (this._discard) { + this._discardFunction(this._queryId, this) + } else { + this._moreFunction(this._queryId, this._fetchSize, this) } } @@ -580,7 +600,8 @@ const _states = { }, name: () => { return 'READY_STREAMING' - } + }, + pull: streamObserver => streamObserver._more() }, READY: { // reactive start state @@ -595,7 +616,8 @@ const _states = { }, name: () => { return 'READY' - } + }, + pull: streamObserver => streamObserver._more() }, STREAMING: { onSuccess: (streamObserver, meta) => { @@ -610,7 +632,8 @@ const _states = { }, name: () => { return 'STREAMING' - } + }, + pull: streamObserver => streamObserver._more() }, FAILED: { onError: error => { @@ -618,12 +641,14 @@ const _states = { }, name: () => { return 'FAILED' - } + }, + pull: () => {} }, SUCCEEDED: { name: () => { return 'SUCCEEDED' - } + }, + pull: () => {} } } diff --git a/packages/core/src/internal/observers.ts b/packages/core/src/internal/observers.ts index 73ffd0f9a..996b2f3c9 100644 --- a/packages/core/src/internal/observers.ts +++ b/packages/core/src/internal/observers.ts @@ -89,6 +89,14 @@ export interface ResultStreamObserver extends StreamObserver { */ prepareToHandleSingleResponse(): void + setPullMode(pullMode: boolean): void + + pull(): void + + isReady(): boolean + + getWatermaks(): { high: number; low: number } + /** * Mark this observer as if it has completed with no metadata. */ @@ -123,6 +131,22 @@ export class CompletedObserver implements ResultStreamObserver { // do nothing } + isReady(): boolean { + return false; + } + + getWatermaks(): { high: number; low: number } { + return { high: 0, low: 0 }; + } + + setPullMode(_: boolean): void { + // do nothing + } + + pull(): void { + // do nothing + } + onError(error: Error): void { // nothing to do, already finished throw Error('CompletedObserver not supposed to call onError') @@ -169,6 +193,23 @@ export class FailedObserver implements ResultStreamObserver { markCompleted(): void { // do nothing } + + isReady(): boolean { + return false; + } + + getWatermaks(): { high: number; low: number } { + return { high: 0, low: 0 }; + } + + setPullMode(_: boolean): void { + // do nothing + } + + pull(): void { + // do nothing + } + } function apply(thisArg: any, func?: (param: T) => void, param?: T): void { diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 92a428d88..6cd89153e 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -236,14 +236,22 @@ class Result implements Promise { } } - this.subscribe(observer) + const streaming = await this._subscribe(observer, true) + const watermarks = streaming.getWatermaks() + + if (streaming.isReady()) { + streaming.pull() + } while(true) { const value = await observer.consume() if (value.done) { return value.summary! - } + } yield value.record! + if (observer.queueSize < watermarks.low) { + streaming.pull() + } } } @@ -304,6 +312,11 @@ class Result implements Promise { * @return {void} */ subscribe(observer: ResultObserver): void { + this._subscribe(observer) + .catch(() => {}) + } + + _subscribe(observer: ResultObserver, pullMode: boolean = false): Promise { const onCompletedOriginal = observer.onCompleted || DEFAULT_ON_COMPLETED const onCompletedWrapper = (metadata: any) => { this._createSummary(metadata).then(summary => @@ -324,11 +337,16 @@ class Result implements Promise { } observer.onError = onErrorWrapper - this._streamObserverPromise + return this._streamObserverPromise .then(o => { - return o.subscribe(observer) + o.setPullMode(pullMode) + o.subscribe(observer) + return o + }) + .catch(error => { + observer.onError!(error) + return Promise.reject(error) }) - .catch(error => observer.onError!(error)) } /** diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index c485eaf10..9d5649127 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -765,6 +765,22 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver { .filter(o => o.onCompleted) .forEach(o => o.onCompleted!(meta)) } + + isReady(): boolean { + return false; + } + + getWatermaks(): { high: number; low: number } { + return { high: 0, low: 0 }; + } + + setPullMode(_: boolean): void { + // do nothing + } + + pull(): void { + // do nothing + } } function asConnection(value: any): Connection { From 674724d505b65db89db8e7a2e96b776b0f322f34 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 29 Dec 2021 11:55:06 +0100 Subject: [PATCH 07/26] Remove ResultObserver from testkit-backend since it's not needed --- packages/testkit-backend/src/context.js | 15 -- .../testkit-backend/src/request-handlers.js | 140 +++++------------- .../testkit-backend/src/result-observer.js | 127 ---------------- .../src/skipped-tests/common.js | 12 +- 4 files changed, 42 insertions(+), 252 deletions(-) delete mode 100644 packages/testkit-backend/src/result-observer.js diff --git a/packages/testkit-backend/src/context.js b/packages/testkit-backend/src/context.js index fb1bf9087..df1c80b1e 100644 --- a/packages/testkit-backend/src/context.js +++ b/packages/testkit-backend/src/context.js @@ -5,7 +5,6 @@ export default class Context { this._sessions = {} this._txs = {} this._resolverRequests = {} - this._resultObservers = {} this._errors = {} this._shouldRunTest = shouldRunTest this._results = {} @@ -34,10 +33,6 @@ export default class Context { return this._add(this._errors, error) } - addResultObserver (observer) { - return this._add(this._resultObservers, observer) - } - addResolverRequest (resolve, reject) { const id = this._add(this._resolverRequests, { resolve, @@ -94,20 +89,10 @@ export default class Context { delete this._txs[id] } - removeResultObserver (id) { - delete this._resultObservers[id] - } - removeResolverRequest (id) { delete this._resolverRequests[id] } - getResultObserversBySessionId (sessionId) { - return Object.values(this._resultObservers).filter( - obs => obs.sessionId === sessionId - ) - } - getTxsBySessionId (sessionId) { return Object.values(this._txs).filter(tx => tx.sessionId === sessionId) } diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 02436f5e8..e013d1a8a 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -1,8 +1,6 @@ import neo4j from './neo4j' -import ResultObserver from './result-observer.js' import { cypherToNative, nativeToCypher } from './cypher-native-binders.js' import tls from 'tls' -const USE_ASYNC = true const SUPPORTED_TLS = (() => { if (tls.DEFAULT_MAX_VERSION) { @@ -149,109 +147,60 @@ export function SessionRun (context, data, wire) { } } - const observers = context.getResultObserversBySessionId(sessionId) - - Promise.all(observers.map(obs => obs.completitionPromise())) - .catch(_ => null) - .then(_ => { - let result - try { - result = session.run(cypher, params, { metadata, timeout }) - } catch (e) { - console.log('got some err: ' + JSON.stringify(e)) - wire.writeError(e) - return - } - let id - if (USE_ASYNC) { - id = context.addResult(result) - } else { - const resultObserver = new ResultObserver({ sessionId, result }) - result.subscribe(resultObserver) - id = context.addResultObserver(resultObserver) - } - wire.writeResponse('Result', { id }) - }) + let result + try { + result = session.run(cypher, params, { metadata, timeout }) + } catch (e) { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + return + } + + let id = context.addResult(result) + + wire.writeResponse('Result', { id }) } export function ResultNext (context, data, wire) { const { resultId } = data - if (USE_ASYNC) { - const result = context.getResult(resultId) - if (!("recordIt" in result)) { - result.recordIt = result[Symbol.asyncIterator]() - } - result.recordIt.next().then(({ value, done }) => { - if (done) { - wire.writeResponse('NullRecord', null) - } else { - const values = Array.from(value.values()).map(nativeToCypher) - wire.writeResponse('Record', { - values: values - }) - } - }).catch(e => { - console.log('got some err: ' + JSON.stringify(e)) - wire.writeError(e) - }); - } else { - const resultObserver = context.getResultObserver(resultId) - const nextPromise = resultObserver.next() - nextPromise - .then(rec => { - if (rec) { - const values = Array.from(rec.values()).map(nativeToCypher) - wire.writeResponse('Record', { - values: values - }) - } else { - wire.writeResponse('NullRecord', null) - } - }) - .catch(e => { - console.log('got some err: ' + JSON.stringify(e)) - wire.writeError(e) - }) + const result = context.getResult(resultId) + if (!("recordIt" in result)) { + result.recordIt = result[Symbol.asyncIterator]() } - + result.recordIt.next().then(({ value, done }) => { + if (done) { + wire.writeResponse('NullRecord', null) + } else { + const values = Array.from(value.values()).map(nativeToCypher) + wire.writeResponse('Record', { + values: values + }) + } + }).catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }); } export function ResultConsume (context, data, wire) { const { resultId } = data - if (USE_ASYNC) { - const result = context.getResult(resultId) - result.summary().then(summary => { - console.log(summary); - wire.writeResponse('Summary', { - ...summary, - serverInfo: { - agent: summary.server.agent, - protocolVersion: summary.server.protocolVersion? summary.server.protocolVersion.toFixed(1) : 0 - } - }) - }).catch(e => wire.writeError(e)) - } else { - const resultObserver = context.getResultObserver(resultId) - resultObserver - .completitionPromise() - .then(summary => { - wire.writeResponse('Summary', { - ...summary, - serverInfo: { - agent: summary.server.agent, - protocolVersion: summary.server.protocolVersion.toFixed(1) - } - }) - }) - .catch(e => wire.writeError(e)) - } + const result = context.getResult(resultId) + result.summary().then(summary => { + console.log(summary); + wire.writeResponse('Summary', { + ...summary, + serverInfo: { + agent: summary.server.agent, + protocolVersion: summary.server.protocolVersion.toFixed(1) + } + }) + }).catch(e => wire.writeError(e)) } export function ResultList (context, data, wire) { const { resultId } = data - const resultObserver = context.getResultObserver(resultId) - const result = resultObserver.result + const result = context.getResult(resultId) result .then(({ records }) => { @@ -287,15 +236,8 @@ export function TransactionRun (context, data, wire) { } } const result = tx.tx.run(cypher, params) + const id = context.addResult(result) - let id - if (USE_ASYNC) { - id = context.addResult(result) - } else { - const resultObserver = new ResultObserver({ result }) - result.subscribe(resultObserver) - id = context.addResultObserver(resultObserver) - } wire.writeResponse('Result', { id }) } diff --git a/packages/testkit-backend/src/result-observer.js b/packages/testkit-backend/src/result-observer.js deleted file mode 100644 index 5d5b6d4b2..000000000 --- a/packages/testkit-backend/src/result-observer.js +++ /dev/null @@ -1,127 +0,0 @@ -import neo4j from './neo4j' - -export default class ResultObserver { - constructor ({ sessionId, result }) { - this.sessionId = sessionId - this._result = result - this.keys = null - this._stream = [] - this.summary = null - this._err = null - this._promise = null - this.onKeys = this.onKeys.bind(this) - this.onNext = this.onNext.bind(this) - this.onCompleted = this.onCompleted.bind(this) - this.onError = this.onError.bind(this) - this._completitionPromise = null - this._subscribed = false - } - - onKeys (keys) { - this.keys = keys - } - - onNext (record) { - this._stream.push(record) - this._fulfill() - } - - onCompleted (summary) { - this._summary = summary - this._fulfill() - this._resolve(this._completitionPromise, summary) - this._completitionPromise = null - } - - onError (e) { - this._stream.push(e) - this._fulfill() - this._reject(this._completitionPromise, e) - this._completitionPromise = null - } - - get result () { - return this._result - } - - // Returns a promise, only one outstanding next! - next () { - this._subscribe() - return new Promise((resolve, reject) => { - this._promise = { - resolve, - reject - } - this._fulfill() - }) - } - - completitionPromise () { - this._subscribe() - return new Promise((resolve, reject) => { - if (this._summary) { - resolve(this._summary) - } else if (this._err) { - reject(this._err) - } else { - this._completitionPromise = { - resolve, - reject - } - } - }) - } - - _fulfill () { - if (!this._promise) { - return - } - - // The stream contains something - if (this._stream.length) { - const x = this._stream.shift() - if (!(x instanceof neo4j.types.Record)) { - // For further calls, use this (stream should be empty after this) - this._err = x - this._promise.reject(x) - this._promise = null - return - } - this._promise.resolve(x) - this._promise = null - return - } - - // There has been an error, continue to return that error - if (this._err) { - this._promise.reject(this._err) - this._promise = null - return - } - - // All records have been received - if (this._summary) { - this._promise.resolve(null) - this._promise = null - } - } - - _resolve (promise, data) { - if (promise) { - promise.resolve(data) - } - } - - _reject (promise, err) { - if (promise) { - promise.reject(err) - } - } - - _subscribe () { - if (!this._subscribed) { - this._result.subscribe(this) - this._subscribed = true - } - } -} diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index a130b34f9..0fed1b054 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -34,10 +34,6 @@ const skippedTests = [ '.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors' ) ), - skip( - 'It could not guarantee the order of records requests between in the nested transactions', - ifEquals('stub.iteration.TxRun.test_nested') - ), skip( 'Keeps retrying on commit despite connection being dropped', ifEquals('stub.retry.TestRetry.test_disconnect_on_commit') @@ -88,13 +84,7 @@ const skippedTests = [ ifEquals( 'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list' ) - ), - skip( - 'Skipped during the AsyncIterator implementation', - ifEndsWith('.test_session_reuse'), - ifEndsWith('.test_iteration_nested'), - ifStartsWith('stub.iteration.test_result_list.TestResultList') - ) + )å ] export default skippedTests From 4ad01f2574887e85ba4dbb8d2d3bfde2fd9dba24 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 29 Dec 2021 13:40:48 +0100 Subject: [PATCH 08/26] Fix typo --- packages/testkit-backend/src/skipped-tests/common.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 0fed1b054..44f6b150b 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -84,7 +84,7 @@ const skippedTests = [ ifEquals( 'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list' ) - )å + ) ] export default skippedTests From f924d6cca62def6726391a0902cef1cc8a30e07e Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 29 Dec 2021 15:18:56 +0100 Subject: [PATCH 09/26] Stream only when it ready for it --- packages/bolt-connection/src/bolt/stream-observers.js | 4 ++-- packages/core/src/result.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index e9f77dca2..8a0ce75a6 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -365,7 +365,6 @@ class ResultStreamObserver extends StreamObserver { if (!this._pullMode && (this._discard || this._autoPull)) { this._more() } - this._setState(_states.STREAMING) } } @@ -375,6 +374,7 @@ class ResultStreamObserver extends StreamObserver { } else { this._moreFunction(this._queryId, this._fetchSize, this) } + this._setState(_states.STREAMING) } _storeMetadataForCompletion (meta) { @@ -633,7 +633,7 @@ const _states = { name: () => { return 'STREAMING' }, - pull: streamObserver => streamObserver._more() + pull: () => {} }, FAILED: { onError: error => { diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 6cd89153e..2df3e0687 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -249,7 +249,7 @@ class Result implements Promise { return value.summary! } yield value.record! - if (observer.queueSize < watermarks.low) { + if (observer.queueSize <= watermarks.high) { streaming.pull() } } From 3d4832b3bdfcc505dbd1f5aa43a69ad7c97867af Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 29 Dec 2021 16:33:27 +0100 Subject: [PATCH 10/26] Fixing summary consumption. Summary was not available with the correct information after the result being consumed. This fix, solve this issue by wrap the summary with the same treatment subscribe and then have on complete. This change also introduce a cache for capturing the last summary the stream complete, this way the summary function could be called after the results get consumed. PS: The same fixes were applied to keys() --- packages/core/src/result.ts | 76 ++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 2df3e0687..65c6a8c31 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -40,6 +40,13 @@ const DEFAULT_ON_ERROR = (error: Error) => { */ const DEFAULT_ON_COMPLETED = (summary: ResultSummary) => {} +/** + * @private + * @param {string[]} keys List of keys of the record in the result + * @return {void} + */ +const DEFAULT_ON_KEYS = (_keys: string[]) => {} + /** * The query result is the combination of the {@link ResultSummary} and * the array {@link Record[]} produced by the query @@ -94,6 +101,8 @@ class Result implements Promise { private _query: Query private _parameters: any private _connectionHolder: connectionHolder.ConnectionHolder + private _keys: string[] | null + private _summary: ResultSummary | null /** * Inject the observer to be used. @@ -116,6 +125,8 @@ class Result implements Promise { this._query = query this._parameters = parameters || {} this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER + this._keys = null + this._summary = null } /** @@ -128,13 +139,16 @@ class Result implements Promise { } */ keys(): Promise { + if (this._keys != null) { + return Promise.resolve(this._keys) + } return new Promise((resolve, reject) => { this._streamObserverPromise .then(observer => - observer.subscribe({ + observer.subscribe(this._decorateObserver({ onKeys: keys => resolve(keys), onError: err => reject(err) - }) + })) ) .catch(reject) }) @@ -150,15 +164,17 @@ class Result implements Promise { * */ summary(): Promise { + if (this._summary != null) { + return Promise.resolve(this._summary) + } return new Promise((resolve, reject) => { this._streamObserverPromise .then(o => { o.cancel() - o.subscribe({ - onCompleted: metadata => - this._createSummary(metadata).then(resolve, reject), + o.subscribe(this._decorateObserver({ + onCompleted: summary => resolve(summary), onError: err => reject(err) - }) + })) }) .catch(reject) }) @@ -317,14 +333,29 @@ class Result implements Promise { } _subscribe(observer: ResultObserver, pullMode: boolean = false): Promise { + const _observer = this._decorateObserver(observer) + + return this._streamObserverPromise + .then(o => { + o.setPullMode(pullMode) + o.subscribe(_observer) + return o + }) + .catch(error => { + _observer.onError!(error) + return Promise.reject(error) + }) + } + + _decorateObserver(observer: ResultObserver): ResultObserver { const onCompletedOriginal = observer.onCompleted || DEFAULT_ON_COMPLETED const onCompletedWrapper = (metadata: any) => { - this._createSummary(metadata).then(summary => - onCompletedOriginal.call(observer, summary) - ) - } - observer.onCompleted = onCompletedWrapper + this._createSummary(metadata).then(summary => { + this._summary = summary + return onCompletedOriginal.call(observer, summary) + }) + } const onErrorOriginal = observer.onError || DEFAULT_ON_ERROR const onErrorWrapper = (error: Error) => { @@ -335,18 +366,19 @@ class Result implements Promise { onErrorOriginal.call(observer, error) }) } - observer.onError = onErrorWrapper - return this._streamObserverPromise - .then(o => { - o.setPullMode(pullMode) - o.subscribe(observer) - return o - }) - .catch(error => { - observer.onError!(error) - return Promise.reject(error) - }) + const onKeysOriginal = observer.onKeys || DEFAULT_ON_KEYS + const onKeysWrapper = (keys: string[]) => { + this._keys = keys + return onKeysOriginal.call(observer, keys) + } + + return { + onNext: observer.onNext? observer.onNext.bind(observer) : undefined, + onKeys: onKeysWrapper, + onCompleted: onCompletedWrapper, + onError: onErrorWrapper + } } /** From be5c4420e650845a9f21f7889fe00e4ac7170129 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 29 Dec 2021 17:20:55 +0100 Subject: [PATCH 11/26] Don't stream in ready streaming since a pull was already sent --- packages/bolt-connection/src/bolt/stream-observers.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index 8a0ce75a6..4c6be28c8 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -601,7 +601,7 @@ const _states = { name: () => { return 'READY_STREAMING' }, - pull: streamObserver => streamObserver._more() + pull: () => {} }, READY: { // reactive start state From 6383c31ffd19befd19e946eb4677703682680dfd Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 30 Dec 2021 12:17:35 +0100 Subject: [PATCH 12/26] Skipping tests which were working by accident in the last implementation --- packages/testkit-backend/package.json | 3 ++- packages/testkit-backend/src/request-handlers.js | 1 - packages/testkit-backend/src/skipped-tests/common.js | 6 ++++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/testkit-backend/package.json b/packages/testkit-backend/package.json index 92761103a..c00e95f9d 100644 --- a/packages/testkit-backend/package.json +++ b/packages/testkit-backend/package.json @@ -13,7 +13,8 @@ "build": "rollup src/index.js --config rollup.config.js", "start": "node --version | grep -q v10. && node -r esm src/index.js || node --experimental-specifier-resolution=node src/index.js", "clean": "rm -fr node_modules public/index.js", - "prepare": "npm run build" + "prepare": "npm run build", + "node": "node" }, "repository": { "type": "git", diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index e013d1a8a..398b88203 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -186,7 +186,6 @@ export function ResultConsume (context, data, wire) { const { resultId } = data const result = context.getResult(resultId) result.summary().then(summary => { - console.log(summary); wire.writeResponse('Summary', { ...summary, serverInfo: { diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 44f6b150b..01dcc380f 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -2,8 +2,10 @@ import skip, { ifEquals, ifEndsWith, ifStartsWith } from './skip' const skippedTests = [ skip( - 'Not support by the JS driver', - ifEquals('neo4j.sessionrun.TestSessionRun.test_partial_iteration') + 'Partial session iteration is not supported by the js driver', + ifEquals('neo4j.sessionrun.TestSessionRun.test_partial_iteration'), + ifEquals('neo4j.test_session_run.TestSessionRun.test_session_reuse'), + ifEquals('neo4j.test_session_run.TestSessionRun.test_iteration_nested'), ), skip( 'The driver has no support domain_name_resolver', From 8c25b113d7030a7b3f39c305712d9868fa0a0bc2 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 30 Dec 2021 13:08:24 +0100 Subject: [PATCH 13/26] Pulling data before and after the promise get solved --- packages/core/src/result.ts | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 65c6a8c31..4c17c0f64 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -254,20 +254,21 @@ class Result implements Promise { const streaming = await this._subscribe(observer, true) const watermarks = streaming.getWatermaks() - - if (streaming.isReady()) { - streaming.pull() + const pullIfNeeded = () => { + if (observer.queueSize <= watermarks.high) { + streaming.pull() + } } + while(true) { + pullIfNeeded() const value = await observer.consume() if (value.done) { return value.summary! } + pullIfNeeded() yield value.record! - if (observer.queueSize <= watermarks.high) { - streaming.pull() - } } } From 15a9d40b07849fe840aaa1aa7f4ad5c386050a99 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 10 Jan 2022 18:15:06 +0100 Subject: [PATCH 14/26] Removing unneeded properties and move watermark logic to the session --- .../src/bolt/bolt-protocol-v1.js | 8 +- .../src/bolt/bolt-protocol-v3.js | 8 +- .../src/bolt/bolt-protocol-v4x0.js | 8 +- .../src/bolt/bolt-protocol-v4x4.js | 8 +- .../src/bolt/stream-observers.js | 28 ++----- packages/core/src/internal/observers.ts | 20 ----- packages/core/src/result.ts | 9 ++- packages/core/src/session.ts | 28 ++++++- packages/core/src/transaction.ts | 76 +++++++++++++++---- packages/core/test/result.test.ts | 8 -- 10 files changed, 123 insertions(+), 78 deletions(-) diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js index d89a98ebb..dec32a488 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js @@ -278,7 +278,9 @@ export default class BoltProtocol { afterError, beforeComplete, afterComplete, - flush = true + flush = true, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {} ) { const observer = new ResultStreamObserver({ @@ -288,7 +290,9 @@ export default class BoltProtocol { beforeError, afterError, beforeComplete, - afterComplete + afterComplete, + highRecordWatermark, + lowRecordWatermark }) // bookmark and mode are ignored in this version of the protocol diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v3.js b/packages/bolt-connection/src/bolt/bolt-protocol-v3.js index 680ec84b9..2ea0a246e 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v3.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v3.js @@ -163,7 +163,9 @@ export default class BoltProtocol extends BoltProtocolV2 { afterError, beforeComplete, afterComplete, - flush = true + flush = true, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {} ) { const observer = new ResultStreamObserver({ @@ -173,7 +175,9 @@ export default class BoltProtocol extends BoltProtocolV2 { beforeError, afterError, beforeComplete, - afterComplete + afterComplete, + highRecordWatermark, + lowRecordWatermark }) // passing in a database name on this protocol version throws an error diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js b/packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js index dda8a3b42..21802762a 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js @@ -90,7 +90,9 @@ export default class BoltProtocol extends BoltProtocolV3 { afterComplete, flush = true, reactive = false, - fetchSize = FETCH_ALL + fetchSize = FETCH_ALL, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {} ) { const observer = new ResultStreamObserver({ @@ -104,7 +106,9 @@ export default class BoltProtocol extends BoltProtocolV3 { beforeError, afterError, beforeComplete, - afterComplete + afterComplete, + highRecordWatermark, + lowRecordWatermark }) // passing impersonated user on this protocol version throws an error diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js b/packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js index 24eee8ca4..261a879fb 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js @@ -84,7 +84,9 @@ export default class BoltProtocol extends BoltProtocolV43 { afterComplete, flush = true, reactive = false, - fetchSize = FETCH_ALL + fetchSize = FETCH_ALL, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {} ) { const observer = new ResultStreamObserver({ @@ -98,7 +100,9 @@ export default class BoltProtocol extends BoltProtocolV43 { beforeError, afterError, beforeComplete, - afterComplete + afterComplete, + highRecordWatermark, + lowRecordWatermark }) const flushRun = reactive diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index 4c6be28c8..73298323f 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -68,7 +68,9 @@ class ResultStreamObserver extends StreamObserver { afterKeys, beforeComplete, afterComplete, - server + server, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {}) { super() @@ -94,8 +96,10 @@ class ResultStreamObserver extends StreamObserver { this._discardFunction = discardFunction this._discard = false this._fetchSize = fetchSize + this._lowRecordWatermark = lowRecordWatermark + this._highRecordWatermark = highRecordWatermark this._setState(reactive ? _states.READY : _states.READY_STREAMING) - this._setupAuoPull(fetchSize) + this._setupAuoPull() this._pullMode = false; } @@ -107,17 +111,6 @@ class ResultStreamObserver extends StreamObserver { return this._state.pull(this) } - isReady() { - return this._state === _states.READY - } - - getWatermaks () { - return { - high: this._highRecordWatermark, - low: this._lowRecordWatermark - } - } - /** * Will be called on every record that comes in and transform a raw record * to a Object. If user-provided observer is present, pass transformed record @@ -392,15 +385,8 @@ class ResultStreamObserver extends StreamObserver { this._state = state } - _setupAuoPull (fetchSize) { + _setupAuoPull () { this._autoPull = true - if (fetchSize === FETCH_ALL) { - this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull - this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull - } else { - this._lowRecordWatermark = 0.3 * fetchSize - this._highRecordWatermark = 0.7 * fetchSize - } } } diff --git a/packages/core/src/internal/observers.ts b/packages/core/src/internal/observers.ts index 996b2f3c9..bcf203515 100644 --- a/packages/core/src/internal/observers.ts +++ b/packages/core/src/internal/observers.ts @@ -93,10 +93,6 @@ export interface ResultStreamObserver extends StreamObserver { pull(): void - isReady(): boolean - - getWatermaks(): { high: number; low: number } - /** * Mark this observer as if it has completed with no metadata. */ @@ -131,14 +127,6 @@ export class CompletedObserver implements ResultStreamObserver { // do nothing } - isReady(): boolean { - return false; - } - - getWatermaks(): { high: number; low: number } { - return { high: 0, low: 0 }; - } - setPullMode(_: boolean): void { // do nothing } @@ -194,14 +182,6 @@ export class FailedObserver implements ResultStreamObserver { // do nothing } - isReady(): boolean { - return false; - } - - getWatermaks(): { high: number; low: number } { - return { high: 0, low: 0 }; - } - setPullMode(_: boolean): void { // do nothing } diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 4c17c0f64..eda5da43b 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -103,6 +103,7 @@ class Result implements Promise { private _connectionHolder: connectionHolder.ConnectionHolder private _keys: string[] | null private _summary: ResultSummary | null + private _watermarks: { high: number; low: number } /** * Inject the observer to be used. @@ -117,7 +118,8 @@ class Result implements Promise { streamObserverPromise: Promise, query: Query, parameters?: any, - connectionHolder?: connectionHolder.ConnectionHolder + connectionHolder?: connectionHolder.ConnectionHolder, + watermarks: { high: number; low: number } = { high: Number.MAX_VALUE, low: Number.MAX_VALUE } ) { this._stack = captureStacktrace() this._streamObserverPromise = streamObserverPromise @@ -127,6 +129,7 @@ class Result implements Promise { this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER this._keys = null this._summary = null + this._watermarks = watermarks } /** @@ -253,9 +256,9 @@ class Result implements Promise { } const streaming = await this._subscribe(observer, true) - const watermarks = streaming.getWatermaks() + const pullIfNeeded = () => { - if (observer.queueSize <= watermarks.high) { + if (observer.queueSize <= this._watermarks.high) { streaming.pull() } } diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 2597a140c..f754be845 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -18,6 +18,7 @@ */ import { ResultStreamObserver, FailedObserver } from './internal/observers' import { validateQueryAndParameters } from './internal/util' +import { FETCH_ALL } from './internal/constants' import { newError } from './error' import Result from './result' import Transaction from './transaction' @@ -60,6 +61,8 @@ class Session { private _impersonatedUser?: string private _onComplete: (meta: any) => void private _databaseNameResolved: boolean + private _lowRecordWatermark: number + private _highRecordWatermark: number /** * @constructor @@ -157,7 +160,9 @@ class Session { impersonatedUser: this._impersonatedUser, afterComplete: this._onComplete, reactive: this._reactive, - fetchSize: this._fetchSize + fetchSize: this._fetchSize, + lowRecordWatermark: this._lowRecordWatermark, + highRecordWatermark: this._highRecordWatermark }) }) } @@ -192,7 +197,8 @@ class Session { }) ) } - return new Result(observerPromise, query, parameters, connectionHolder) + const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark } + return new Result(observerPromise, query, parameters, connectionHolder, watermarks) } async _acquireConnection(connectionConsumer: ConnectionConsumer) { @@ -269,7 +275,9 @@ class Session { onBookmark: this._updateBookmark.bind(this), onConnection: this._assertSessionIsOpen.bind(this), reactive: this._reactive, - fetchSize: this._fetchSize + fetchSize: this._fetchSize, + lowRecordWatermark: this._lowRecordWatermark, + highRecordWatermark: this._highRecordWatermark }) tx._begin(this._lastBookmark, txConfig) return tx @@ -418,6 +426,20 @@ class Session { this._updateBookmark(new Bookmark(meta.bookmark)) } + /** + * @private + * @returns {void} + */ + private _setupWatermark(): void { + if (this._fetchSize === FETCH_ALL) { + this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull + this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull + } else { + this._lowRecordWatermark = 0.3 * this._fetchSize + this._highRecordWatermark = 0.7 * this._fetchSize + } + } + /** * @protected */ diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index f48a63a11..3ac232191 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -53,6 +53,8 @@ class Transaction { private _fetchSize: number private _results: any[] private _impersonatedUser?: string + private _lowRecordWatermak: number + private _highRecordWatermark: number /** * @constructor @@ -72,7 +74,9 @@ class Transaction { onConnection, reactive, fetchSize, - impersonatedUser + impersonatedUser, + highRecordWatermark, + lowRecordWatermark }: { connectionHolder: ConnectionHolder onClose: () => void @@ -80,7 +84,9 @@ class Transaction { onConnection: () => void reactive: boolean fetchSize: number - impersonatedUser?: string + impersonatedUser?: string, + highRecordWatermark: number, + lowRecordWatermark: number }) { this._connectionHolder = connectionHolder this._reactive = reactive @@ -93,6 +99,8 @@ class Transaction { this._fetchSize = fetchSize this._results = [] this._impersonatedUser = impersonatedUser + this._lowRecordWatermak = lowRecordWatermark + this._highRecordWatermark = highRecordWatermark } /** @@ -243,6 +251,8 @@ interface StateTransitionParams { pendingResults: any[] reactive: boolean fetchSize: number + highRecordWatermark: number + lowRecordWatermark: number } const _states = { @@ -296,6 +306,8 @@ const _states = { onConnection, reactive, fetchSize, + highRecordWatermark, + lowRecordWatermark }: StateTransitionParams ): any => { // RUN in explicit transaction can't contain bookmarks and transaction configuration @@ -312,6 +324,8 @@ const _states = { afterComplete: onComplete, reactive: reactive, fetchSize: fetchSize, + highRecordWatermark: highRecordWatermark, + lowRecordWatermark: lowRecordWatermark }) } else { throw newError('No connection available') @@ -323,7 +337,9 @@ const _states = { observerPromise, query, parameters, - connectionHolder + connectionHolder, + highRecordWatermark, + lowRecordWatermark ) } }, @@ -346,7 +362,9 @@ const _states = { }), 'COMMIT', {}, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ), state: _states.FAILED } @@ -361,7 +379,9 @@ const _states = { new CompletedObserver(), 'ROLLBACK', {}, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ), state: _states.FAILED } @@ -380,7 +400,9 @@ const _states = { }), query, parameters, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ) } }, @@ -401,7 +423,10 @@ const _states = { onError }), 'COMMIT', - {} + {}, + EMPTY_CONNECTION_HOLDER, + 0, // high watermark + 0 // low watermark ), state: _states.SUCCEEDED, connectionHolder @@ -421,7 +446,10 @@ const _states = { onError }), 'ROLLBACK', - {} + {}, + EMPTY_CONNECTION_HOLDER, + 0, // high watermark + 0 // low watermark ), state: _states.SUCCEEDED, connectionHolder @@ -441,7 +469,9 @@ const _states = { }), query, parameters, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ) } }, @@ -463,7 +493,9 @@ const _states = { }), 'COMMIT', {}, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ), state: _states.ROLLED_BACK } @@ -482,7 +514,9 @@ const _states = { }), 'ROLLBACK', {}, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ), state: _states.ROLLED_BACK } @@ -501,7 +535,9 @@ const _states = { }), query, parameters, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ) } } @@ -555,7 +591,11 @@ function finishTransaction( observerPromise, commit ? 'COMMIT' : 'ROLLBACK', {}, - connectionHolder + connectionHolder, + { + high: Number.MAX_VALUE, + low: Number.MAX_VALUE + }, ) } @@ -574,13 +614,19 @@ function newCompletedResult( observerPromise: ResultStreamObserver | Promise, query: Query, parameters: any, - connectionHolder: ConnectionHolder = EMPTY_CONNECTION_HOLDER + connectionHolder: ConnectionHolder = EMPTY_CONNECTION_HOLDER, + highRecordWatermark: number, + lowRecordWatermark: number ): Result { return new Result( Promise.resolve(observerPromise), query, parameters, - new ReadOnlyConnectionHolder(connectionHolder || EMPTY_CONNECTION_HOLDER) + new ReadOnlyConnectionHolder(connectionHolder || EMPTY_CONNECTION_HOLDER), + { + low: lowRecordWatermark, + high: highRecordWatermark + } ) } diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 9d5649127..cdc6671f7 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -766,14 +766,6 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver { .forEach(o => o.onCompleted!(meta)) } - isReady(): boolean { - return false; - } - - getWatermaks(): { high: number; low: number } { - return { high: 0, low: 0 }; - } - setPullMode(_: boolean): void { // do nothing } From 3dffb32f1c58636a9764708047420692f37d1cd4 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 11 Jan 2022 12:37:16 +0100 Subject: [PATCH 15/26] Add tests to ResultStreamObserver.pull() and change StreamObserver.setPullMode to setExplicityPull --- .../src/bolt/stream-observers.js | 12 +- .../test/bolt/stream-observer.test.js | 270 ++++++++++++++++++ packages/core/src/internal/observers.ts | 6 +- packages/core/src/result.ts | 4 +- packages/core/test/result.test.ts | 2 +- 5 files changed, 282 insertions(+), 12 deletions(-) diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index 73298323f..bfd086134 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -99,12 +99,12 @@ class ResultStreamObserver extends StreamObserver { this._lowRecordWatermark = lowRecordWatermark this._highRecordWatermark = highRecordWatermark this._setState(reactive ? _states.READY : _states.READY_STREAMING) - this._setupAuoPull() - this._pullMode = false; + this._setupAutoPull() + this._explicityPull = false; } - setPullMode(pullMode) { - this._pullMode = pullMode; + setExplicityPull(explicityPull) { + this._explicityPull = explicityPull; } pull() { @@ -355,7 +355,7 @@ class ResultStreamObserver extends StreamObserver { _handleStreaming () { if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { - if (!this._pullMode && (this._discard || this._autoPull)) { + if (!this._explicityPull && (this._discard || this._autoPull)) { this._more() } } @@ -385,7 +385,7 @@ class ResultStreamObserver extends StreamObserver { this._state = state } - _setupAuoPull () { + _setupAutoPull () { this._autoPull = true } } diff --git a/packages/bolt-connection/test/bolt/stream-observer.test.js b/packages/bolt-connection/test/bolt/stream-observer.test.js index dc15ce435..6b7d63410 100644 --- a/packages/bolt-connection/test/bolt/stream-observer.test.js +++ b/packages/bolt-connection/test/bolt/stream-observer.test.js @@ -198,6 +198,276 @@ describe('#unit ResultStreamObserver', () => { } }) }) + + describe('when is not explicity pull (default)', () => { + it('should ask for more records when the stream is completed and has more', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: 2000 + }) + + // action + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + streamObserver.onNext([111, 222, 333]) + streamObserver.onCompleted({ has_more: false }) + + // verification + expect(more).toBeCalledTimes(1) + expect(more).toBeCalledWith(queryId, fetchSize, streamObserver) + }) + }) + + describe('when is explicity pull enabled', () => { + it('should not ask for more records when the stream is completed and has more', () => { + // Setup + const queryId = 123 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: 2000 + }) + streamObserver.setExplicityPull(true) + + // action + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + // verification + expect(more).toBeCalledTimes(0) + }) + + describe('pull()', () => { + it('should ask for more records when the stream is completed and has more', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + streamObserver.setExplicityPull(true) + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + // Action + streamObserver.pull() + + // verification + expect(more).toBeCalledTimes(1) + expect(more).toBeCalledWith(queryId, fetchSize, streamObserver) + }) + + it('should ask for more records when the stream is a new reactive stream', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize, + reactive: true + }) + streamObserver.setExplicityPull(true) + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + // Action + streamObserver.pull() + + // verification + expect(more).toBeCalledTimes(1) + expect(more).toBeCalledWith(queryId, fetchSize, streamObserver) + }) + + + it('should ask for more records when the stream is a new reactive stream and not run success come yet', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize, + reactive: true + }) + streamObserver.setExplicityPull(true) + + // Action + streamObserver.pull() + + // verification + expect(more).toBeCalledTimes(1) + expect(more).toBeCalledWith(null, fetchSize, streamObserver) + }) + + it('should not ask for more records when the stream is a new stream', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize, + reactive: false + }) + streamObserver.setExplicityPull(true) + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + // Action + streamObserver.pull() + + // verification + expect(more).toBeCalledTimes(0) + }) + + + it('should not ask for more records when the stream is a new stream', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.setExplicityPull(true) + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + // Action + streamObserver.pull() + + // verification + expect(more).toBeCalledTimes(0) + }) + + it('should not ask for more records when it is streaming', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.setExplicityPull(true) + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + streamObserver.pull() // should actual call + + streamObserver.onNext([111, 222, 333]) + + // Action + streamObserver.pull() + + // verification + expect(more).toBeCalledTimes(1) + }) + + it('should not ask for more records when result is completed', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.setExplicityPull(true) + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: false }) + + // Action + streamObserver.pull() + + // verification + expect(more).toBeCalledTimes(0) + }) + + it('should not ask for more records when stream failed', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.setExplicityPull(true) + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onError(new Error('error')) + + // Action + streamObserver.pull() + + // verification + expect(more).toBeCalledTimes(0) + }) + }) + }) }) describe('#unit RouteObserver', () => { diff --git a/packages/core/src/internal/observers.ts b/packages/core/src/internal/observers.ts index bcf203515..a2d5c3766 100644 --- a/packages/core/src/internal/observers.ts +++ b/packages/core/src/internal/observers.ts @@ -89,7 +89,7 @@ export interface ResultStreamObserver extends StreamObserver { */ prepareToHandleSingleResponse(): void - setPullMode(pullMode: boolean): void + setExplicityPull(explicityPull: boolean): void pull(): void @@ -127,7 +127,7 @@ export class CompletedObserver implements ResultStreamObserver { // do nothing } - setPullMode(_: boolean): void { + setExplicityPull(_: boolean): void { // do nothing } @@ -182,7 +182,7 @@ export class FailedObserver implements ResultStreamObserver { // do nothing } - setPullMode(_: boolean): void { + setExplicityPull(_: boolean): void { // do nothing } diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index eda5da43b..a334ddd50 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -336,12 +336,12 @@ class Result implements Promise { .catch(() => {}) } - _subscribe(observer: ResultObserver, pullMode: boolean = false): Promise { + _subscribe(observer: ResultObserver, explicityPull: boolean = false): Promise { const _observer = this._decorateObserver(observer) return this._streamObserverPromise .then(o => { - o.setPullMode(pullMode) + o.setExplicityPull(explicityPull) o.subscribe(_observer) return o }) diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index cdc6671f7..417e95345 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -766,7 +766,7 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver { .forEach(o => o.onCompleted!(meta)) } - setPullMode(_: boolean): void { + setExplicityPull(_: boolean): void { // do nothing } From 49a3c902e67e4d328d4a4a6c2c3939ac1f912988 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 11 Jan 2022 15:46:47 +0100 Subject: [PATCH 16/26] Test Result async iterator consumption with watermark control --- packages/core/src/result.ts | 21 +++- packages/core/test/result.test.ts | 170 +++++++++++++++++++++++++++++- 2 files changed, 185 insertions(+), 6 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index a334ddd50..b52daa994 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -255,22 +255,33 @@ class Result implements Promise { } } - const streaming = await this._subscribe(observer, true) - + const status = { paused: false } + + let streaming: observer.ResultStreamObserver | null = null + + try { + streaming = await this._subscribe(observer, true) + } catch (e) { + // ignore, we will handle it in consume since the error is notifies in the onError callback + } + const pullIfNeeded = () => { - if (observer.queueSize <= this._watermarks.high) { + if (observer.queueSize >= this._watermarks.high) { + status.paused = true + } else if (observer.queueSize <= this._watermarks.low) { + status.paused = false + } + if (!status.paused && observer.queueSize < this._watermarks.high && streaming) { streaming.pull() } } - while(true) { pullIfNeeded() const value = await observer.consume() if (value.done) { return value.summary! } - pullIfNeeded() yield value.record! } } diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 417e95345..af51790ae 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -33,10 +33,11 @@ describe('Result', () => { describe('new Result(Promise.resolve(new ResultStreamObserverMock()), query, parameters, connectionHolder)', () => { let streamObserverMock: ResultStreamObserverMock let result: Result + const watermarks = { high: 10, low: 3 } beforeEach(() => { streamObserverMock = new ResultStreamObserverMock() - result = new Result(Promise.resolve(streamObserverMock), 'query') + result = new Result(Promise.resolve(streamObserverMock), 'query', undefined, undefined, watermarks) }) describe('.keys()', () => { @@ -554,6 +555,145 @@ describe('Result', () => { ) }) }) + + describe('asyncIterator', () => { + it('should subscribe to the observer', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + streamObserverMock.onCompleted({}) + + for await (const _ of result) { + // do nothing + } + + expect(subscribe).toHaveBeenCalled() + }) + + it('should set the observer to explicity pull', async () => { + const setExplicityPull = jest.spyOn(streamObserverMock, 'setExplicityPull') + streamObserverMock.onCompleted({}) + + for await (const _ of result) { + // do nothing + } + + expect(setExplicityPull).toHaveBeenCalledWith(true) + }) + + it('should set the observer to explicity pull before subscribe', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + const setExplicityPull = jest.spyOn(streamObserverMock, 'setExplicityPull') + streamObserverMock.onCompleted({}) + + for await (const _ of result) { + // do nothing + } + + expect(setExplicityPull.mock.invocationCallOrder[0]) + .toBeLessThan(subscribe.mock.invocationCallOrder[0]) + }) + + it('should not call pull if queue is bigger than high watermark', async () => { + const pull = jest.spyOn(streamObserverMock, 'pull') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i <= watermarks.high; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + await it.next() + + expect(pull).toBeCalledTimes(0) + }) + + it('should call pull if queue is smaller than low watermark', async () => { + const pull = jest.spyOn(streamObserverMock, 'pull') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.low - 1; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + await it.next() + + expect(pull).toBeCalledTimes(1) + }) + + it('should call pull if queue is between lower and high if it never highter then high watermark', async () => { + const pull = jest.spyOn(streamObserverMock, 'pull') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.high - 1; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + for (let i = 0; i < watermarks.high - watermarks.low; i++) { + await it.next() + } + + expect(pull).toBeCalledTimes(watermarks.high - watermarks.low) + }) + + it('should call pull if queue is between lower and high if it get highter then high watermark', async () => { + const pull = jest.spyOn(streamObserverMock, 'pull') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.high; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + for (let i = 0; i < watermarks.high - watermarks.low; i++) { + await it.next() + } + + expect(pull).toBeCalledTimes(0) + }) + + it('should recover from high watermark limit after went to low watermark', async () => { + const pull = jest.spyOn(streamObserverMock, 'pull') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.high; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + for (let i = 0; i < watermarks.high - watermarks.low; i++) { + await it.next() + } + + for (let i = 0; i < 2; i++) { + await it.next() + } + + expect(pull).toBeCalledTimes(2) + }) + + it('should iterate over record', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const records = [] + for await (const record of result) { + records.push(record) + } + + expect(records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2) + ]) + }) + }) }) describe.each([ @@ -638,6 +778,26 @@ describe('Result', () => { expect(Object.keys(queryResult).sort()).toEqual(['records', 'summary']) }) }) + + describe('asyncIterator', () => { + it('should be resolved with an empty array of records', async () => { + const records = [] + + for await (const record of result) { + records.push(record) + } + + expect(records).toStrictEqual([]) + }) + + it('should be resolved with expected result summary', async () => { + const it = result[Symbol.asyncIterator]() + const next = await it.next() + + expect(next.done).toBe(true) + expect(next.value).toStrictEqual(expectedResultSummary) + }) + }) }) describe.each([ @@ -685,6 +845,14 @@ describe('Result', () => { }) }) + describe('asyncIterator', () => { + shouldReturnRejectedPromiseWithTheExpectedError(async () => { + for await (const _ of result) { + // do nothing + } + }) + }) + function shouldReturnRejectedPromiseWithTheExpectedError( supplier: () => Promise ) { From 7cd5f12d408f933717c01e4201c76a2170e20b35 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 11 Jan 2022 16:42:39 +0100 Subject: [PATCH 17/26] Moving Session unit tests to core and fix connection interface --- packages/core/src/connection.ts | 24 ++- packages/core/test/session.test.ts | 100 ++++++++++++ packages/core/test/utils/connection.fake.ts | 160 ++++++++++++++++++++ packages/neo4j-driver/test/session.test.js | 67 -------- 4 files changed, 279 insertions(+), 72 deletions(-) create mode 100644 packages/core/test/session.test.ts create mode 100644 packages/core/test/utils/connection.fake.ts diff --git a/packages/core/src/connection.ts b/packages/core/src/connection.ts index 3ce6e1f06..d5ffff685 100644 --- a/packages/core/src/connection.ts +++ b/packages/core/src/connection.ts @@ -17,24 +17,38 @@ * limitations under the License. */ +import { ServerAddress } from "./internal/server-address" + /** * Interface which defines the raw connection with the database * @private */ class Connection { - id: string = "" - databaseId: string = "" - server: any + get id (): string { + return "" + } + + get databaseId(): string { + return "" + } + + get server(): any { + return {} + } /** * @property {ServerAddress} the server address this connection is opened against */ - address: any + get address(): ServerAddress | undefined { + return undefined + } /** * @property {ServerVersion} the version of the server this connection is connected to */ - version: any + get version(): any { + return undefined + } /** * @returns {boolean} whether this connection is in a working condition diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts new file mode 100644 index 000000000..77f340c8c --- /dev/null +++ b/packages/core/test/session.test.ts @@ -0,0 +1,100 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { ConnectionProvider, Session, Connection } from '../src' +import { ACCESS_MODE_READ } from '../src/internal/constants' +import FakeConnection from './utils/connection.fake' + +describe('session', () => { + it('close should return promise', done => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection) + + session.close().then(() => done()) + }, 70000) + + it('close should return promise even when already closed ', done => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection) + + session.close().then(() => { + session.close().then(() => { + session.close().then(() => { + done() + }) + }) + }) + }, 70000) + + it('close should be idempotent ', done => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection) + + session.close().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy() + + session.close().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy() + + session.close().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy() + done() + }) + }) + }) + }, 70000) + + it('should close transaction executor', done => { + const session = newSessionWithConnection(newFakeConnection()) + + let closeCalledTimes = 0 + // @ts-ignore + const transactionExecutor = session._transactionExecutor + const originalClose = transactionExecutor.close + transactionExecutor.close = () => { + closeCalledTimes++ + originalClose.call(transactionExecutor) + } + + session.close().then(() => { + expect(closeCalledTimes).toEqual(1) + done() + }) + }, 70000) +}) + +function newSessionWithConnection(connection: Connection, fetchSize: number = 1000): Session { + const connectionProvider = new ConnectionProvider() + connectionProvider.acquireConnection = () => Promise.resolve(connection) + connectionProvider.close = () => Promise.resolve() + + const session = new Session({ + mode: ACCESS_MODE_READ, + connectionProvider, + database: "", + fetchSize, + config: {}, + reactive: false + }) + session.beginTransaction() // force session to acquire new connection + return session +} + +function newFakeConnection(): FakeConnection { + return new FakeConnection() +} diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts new file mode 100644 index 000000000..e43914f24 --- /dev/null +++ b/packages/core/test/utils/connection.fake.ts @@ -0,0 +1,160 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Connection } from '../../src' + + +/** + * This class is like a mock of {@link Connection} that tracks invocations count. + * It tries to maintain same "interface" as {@link Connection}. + * It could be replaced with a proper mock by a library like testdouble. + * At the time of writing such libraries require {@link Proxy} support but browser tests execute in + * PhantomJS which does not support proxies. + */ +export default class FakeConnection extends Connection { + private _open: boolean + private _id: number + private _databaseId: string | null + private _requestRoutingInformationMock: ((params: any) => void) | null + private creationTimestamp: number + private resetInvoked: number + private releaseInvoked: number + private seenQueries: string[] + private seenParameters: any[] + private seenProtocolOptions: any[] + private _server: any + private protocolVersion: number | undefined + private protocolErrorsHandled: number + private seenProtocolErrors: string[] + private seenRequestRoutingInformation: any[] + + constructor() { + super() + + this._open = true + this._id = 0 + this._databaseId = null + this._requestRoutingInformationMock = null + this.creationTimestamp = Date.now() + + this.resetInvoked = 0 + this.releaseInvoked = 0 + this.seenQueries = [] + this.seenParameters = [] + this.seenProtocolOptions = [] + this._server = {} + this.protocolVersion = undefined + this.protocolErrorsHandled = 0 + this.seenProtocolErrors = [] + this.seenRequestRoutingInformation = [] + } + + get id(): string { + return this._id.toString() + } + + get databaseId(): string { + return this._databaseId!! + } + + set databaseId(value) { + this._databaseId = value + } + + get server() { + return this._server + } + + get version() { + return this._server.version + } + + set version(value) { + this._server.version = value + } + + protocol() { + // return fake protocol object that simply records seen queries and parameters + return { + run: (query: string, parameters: any | undefined, protocolOptions: any | undefined) => { + this.seenQueries.push(query) + this.seenParameters.push(parameters) + this.seenProtocolOptions.push(protocolOptions) + }, + requestRoutingInformation: (params: any | undefined) => { + this.seenRequestRoutingInformation.push(params) + if (this._requestRoutingInformationMock) { + this._requestRoutingInformationMock(params) + } + }, + version: this.protocolVersion + } + } + + resetAndFlush() { + this.resetInvoked++ + return Promise.resolve() + } + + _release() { + this.releaseInvoked++ + return Promise.resolve() + } + + isOpen() { + return this._open + } + + isNeverReleased() { + return this.isReleasedTimes(0) + } + + isReleasedOnce() { + return this.isReleasedTimes(1) + } + + isReleasedTimes(times: number) { + return this.resetInvoked === times && this.releaseInvoked === times + } + + _handleProtocolError(message: string) { + this.protocolErrorsHandled++ + this.seenProtocolErrors.push(message) + } + + withProtocolVersion(version: number) { + this.protocolVersion = version + return this + } + + withCreationTimestamp(value: number) { + this.creationTimestamp = value + return this + } + + withRequestRoutingInformationMock(requestRoutingInformationMock: (params: any) => void) { + this._requestRoutingInformationMock = requestRoutingInformationMock + return this + } + + closed() { + this._open = false + return this + } +} diff --git a/packages/neo4j-driver/test/session.test.js b/packages/neo4j-driver/test/session.test.js index 0cf5047a4..21e97eaf4 100644 --- a/packages/neo4j-driver/test/session.test.js +++ b/packages/neo4j-driver/test/session.test.js @@ -20,7 +20,6 @@ import neo4j from '../src' import { READ } from '../src/driver' import SingleConnectionProvider from '../../bolt-connection/lib/connection-provider/connection-provider-single' -import FakeConnection from './internal/fake-connection' import sharedNeo4j from './internal/shared-neo4j' import _ from 'lodash' import testUtils from './internal/test-utils' @@ -40,63 +39,6 @@ const { const { PROTOCOL_ERROR, SESSION_EXPIRED } = error -describe('#unit session', () => { - it('close should return promise', done => { - const connection = new FakeConnection() - const session = newSessionWithConnection(connection) - - session.close().then(() => done()) - }, 70000) - - it('close should return promise even when already closed ', done => { - const connection = new FakeConnection() - const session = newSessionWithConnection(connection) - - session.close().then(() => { - session.close().then(() => { - session.close().then(() => { - done() - }) - }) - }) - }, 70000) - - it('close should be idempotent ', done => { - const connection = new FakeConnection() - const session = newSessionWithConnection(connection) - - session.close().then(() => { - expect(connection.isReleasedOnce()).toBeTruthy() - - session.close().then(() => { - expect(connection.isReleasedOnce()).toBeTruthy() - - session.close().then(() => { - expect(connection.isReleasedOnce()).toBeTruthy() - done() - }) - }) - }) - }, 70000) - - it('should close transaction executor', done => { - const session = newSessionWithConnection(new FakeConnection()) - - let closeCalledTimes = 0 - const transactionExecutor = session._transactionExecutor - const originalClose = transactionExecutor.close - transactionExecutor.close = () => { - closeCalledTimes++ - originalClose.call(transactionExecutor) - } - - session.close().then(() => { - expect(closeCalledTimes).toEqual(1) - done() - }) - }, 70000) -}) - describe('#integration session', () => { let driver let session @@ -1304,12 +1246,3 @@ describe('#integration session', () => { }) } }) - -function newSessionWithConnection (connection) { - const connectionProvider = new SingleConnectionProvider( - Promise.resolve(connection) - ) - const session = new Session({ mode: READ, connectionProvider }) - session.beginTransaction() // force session to acquire new connection - return session -} From 453521a9236afa66ba4fff804278a968c9bf88c8 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 11 Jan 2022 17:35:17 +0100 Subject: [PATCH 18/26] Test watermark treatment in the Session --- packages/core/src/session.ts | 1 + packages/core/test/session.test.ts | 137 +++++++++++++++++++- packages/core/test/utils/connection.fake.ts | 51 ++++++-- 3 files changed, 173 insertions(+), 16 deletions(-) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index f754be845..c4ff56658 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -124,6 +124,7 @@ class Session { this._transactionExecutor = _createTransactionExecutor(config) this._onComplete = this._onCompleteCallback.bind(this) this._databaseNameResolved = this._database !== '' + this._setupWatermark() } /** diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index 77f340c8c..8bdd36921 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -17,14 +17,14 @@ * limitations under the License. */ import { ConnectionProvider, Session, Connection } from '../src' -import { ACCESS_MODE_READ } from '../src/internal/constants' +import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants' import FakeConnection from './utils/connection.fake' describe('session', () => { it('close should return promise', done => { const connection = newFakeConnection() const session = newSessionWithConnection(connection) - + session.close().then(() => done()) }, 70000) @@ -41,6 +41,132 @@ describe('session', () => { }) }, 70000) + it('run should send watermarks to Result when fetchsize if defined', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, 1000) + + const result = session.run('RETURN 1') + await result; + + expect(connection.seenProtocolOptions[0]).toMatchObject({ + fetchSize: 1000, + lowRecordWatermark: 300, + highRecordWatermark: 700 + }) + // @ts-ignore + expect(result._watermarks).toEqual({ high: 700, low: 300 }) + }) + + it('run should send watermarks to Result when fetchsize is fetch all', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, FETCH_ALL) + + const result = session.run('RETURN 1') + await result; + + expect(connection.seenProtocolOptions[0]).toMatchObject({ + fetchSize: FETCH_ALL, + lowRecordWatermark: Number.MAX_VALUE, + highRecordWatermark: Number.MAX_VALUE + }) + + // @ts-ignore + expect(result._watermarks).toEqual({ high: Number.MAX_VALUE, low: Number.MAX_VALUE }) + }) + + it('run should send watermarks to Transaction when fetchsize if defined (begin)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, 1000) + + const tx = session.beginTransaction() + + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(300) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(700) + }) + + it('run should send watermarks to Transaction when fetchsize is fetch all (begin)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, FETCH_ALL) + + + const tx = session.beginTransaction() + + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(Number.MAX_VALUE) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(Number.MAX_VALUE) + }) + + it('run should send watermarks to Transaction when fetchsize if defined (writeTransaction)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, 1000) + const status = { functionCalled: false } + + await session.writeTransaction(tx => { + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(300) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(700) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + + it('run should send watermarks to Transaction when fetchsize is fetch all (writeTransaction)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, FETCH_ALL) + const status = { functionCalled: false } + + await session.writeTransaction(tx => { + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(Number.MAX_VALUE) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(Number.MAX_VALUE) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + + it('run should send watermarks to Transaction when fetchsize if defined (readTransaction)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, 1000) + const status = { functionCalled: false } + + await session.readTransaction(tx => { + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(300) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(700) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + + it('run should send watermarks to Transaction when fetchsize is fetch all (readTransaction)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, FETCH_ALL) + const status = { functionCalled: false } + + await session.readTransaction(tx => { + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(Number.MAX_VALUE) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(Number.MAX_VALUE) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + it('close should be idempotent ', done => { const connection = newFakeConnection() const session = newSessionWithConnection(connection) @@ -78,7 +204,7 @@ describe('session', () => { }, 70000) }) -function newSessionWithConnection(connection: Connection, fetchSize: number = 1000): Session { +function newSessionWithConnection(connection: Connection, beginTx: boolean = true, fetchSize: number = 1000): Session { const connectionProvider = new ConnectionProvider() connectionProvider.acquireConnection = () => Promise.resolve(connection) connectionProvider.close = () => Promise.resolve() @@ -91,7 +217,10 @@ function newSessionWithConnection(connection: Connection, fetchSize: number = 10 config: {}, reactive: false }) - session.beginTransaction() // force session to acquire new connection + + if (beginTx) { + session.beginTransaction() // force session to acquire new connection + } return session } diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts index e43914f24..4928a42ac 100644 --- a/packages/core/test/utils/connection.fake.ts +++ b/packages/core/test/utils/connection.fake.ts @@ -17,7 +17,8 @@ * limitations under the License. */ -import { Connection } from '../../src' +import { Connection, ResultObserver, Record, ResultSummary } from '../../src' +import { ResultStreamObserver } from '../../src/internal/observers' /** @@ -32,17 +33,17 @@ export default class FakeConnection extends Connection { private _id: number private _databaseId: string | null private _requestRoutingInformationMock: ((params: any) => void) | null - private creationTimestamp: number - private resetInvoked: number - private releaseInvoked: number - private seenQueries: string[] - private seenParameters: any[] - private seenProtocolOptions: any[] + public creationTimestamp: number + public resetInvoked: number + public releaseInvoked: number + public seenQueries: string[] + public seenParameters: any[] + public seenProtocolOptions: any[] private _server: any - private protocolVersion: number | undefined - private protocolErrorsHandled: number - private seenProtocolErrors: string[] - private seenRequestRoutingInformation: any[] + public protocolVersion: number | undefined + public protocolErrorsHandled: number + public seenProtocolErrors: string[] + public seenRequestRoutingInformation: any[] constructor() { super() @@ -92,10 +93,17 @@ export default class FakeConnection extends Connection { protocol() { // return fake protocol object that simply records seen queries and parameters return { - run: (query: string, parameters: any | undefined, protocolOptions: any | undefined) => { + run: (query: string, parameters: any | undefined, protocolOptions: any | undefined): ResultStreamObserver => { this.seenQueries.push(query) this.seenParameters.push(parameters) this.seenProtocolOptions.push(protocolOptions) + return mockResultStreamObserver(query, parameters) + }, + commitTransaction: () => { + return mockResultStreamObserver('COMMIT', {}) + }, + beginTransaction: () => { + return Promise.resolve() }, requestRoutingInformation: (params: any | undefined) => { this.seenRequestRoutingInformation.push(params) @@ -158,3 +166,22 @@ export default class FakeConnection extends Connection { return this } } + +function mockResultStreamObserver(query: string, parameters: any | undefined): ResultStreamObserver { + return { + onError: (error: any) => { }, + onCompleted: () => { }, + onNext: (result: any) => { }, + cancel: () => { }, + prepareToHandleSingleResponse: () => { }, + pull: () => { }, + markCompleted: () => { }, + setExplicityPull: (_: boolean) => { }, + subscribe: (observer: ResultObserver) => { + if (observer && observer.onCompleted) { + observer.onCompleted(new ResultSummary(query, parameters, {})) + } + + } + } +} From 9c5666faec8f0299aa4e9e3fd00c273c2e55a8b0 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 11 Jan 2022 18:05:50 +0100 Subject: [PATCH 19/26] Add tests to Transaction --- packages/core/src/transaction.ts | 4 +- packages/core/test/transaction.test.ts | 100 +++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 packages/core/test/transaction.test.ts diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index 3ac232191..5c11262a1 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -151,7 +151,9 @@ class Transaction { onComplete: this._onComplete, onConnection: this._onConnection, reactive: this._reactive, - fetchSize: this._fetchSize + fetchSize: this._fetchSize, + highRecordWatermark: this._highRecordWatermark, + lowRecordWatermark: this._lowRecordWatermak }) this._results.push(result) return result diff --git a/packages/core/test/transaction.test.ts b/packages/core/test/transaction.test.ts new file mode 100644 index 000000000..d4a8546dc --- /dev/null +++ b/packages/core/test/transaction.test.ts @@ -0,0 +1,100 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ConnectionProvider, Transaction } from "../src"; +import { Bookmark } from "../src/internal/bookmark"; +import { ConnectionHolder } from "../src/internal/connection-holder"; +import FakeConnection from "./utils/connection.fake"; + +describe('Transaction', () => { + + describe('.run()', () => { + it('should call run with watermarks', async () => { + const connection = newFakeConnection() + const tx = newTransaction({ + connection, + fetchSize: 1000, + highRecordWatermark: 700, + lowRecordWatermark: 300 + }) + + await tx.run('RETURN 1') + + expect(connection.seenProtocolOptions[0]).toMatchObject({ + fetchSize: 1000, + lowRecordWatermark: 300, + highRecordWatermark: 700 + }) + }) + + it('should configure result with watermarks', async () => { + const connection = newFakeConnection() + const tx = newTransaction({ + connection, + fetchSize: 1000, + highRecordWatermark: 700, + lowRecordWatermark: 300 + }) + + var result = tx.run('RETURN 1') + + // @ts-ignore + expect(result._watermarks).toEqual({ high: 700, low: 300 }) + }) + + }) + +}) + +function newTransaction({ + connection, + fetchSize = 1000, + highRecordWatermark = 700, + lowRecordWatermark = 300 +}: { + connection: FakeConnection + fetchSize: number + highRecordWatermark: number, + lowRecordWatermark: number +}): Transaction { + const connectionProvider = new ConnectionProvider() + connectionProvider.acquireConnection = () => Promise.resolve(connection) + connectionProvider.close = () => Promise.resolve() + + const connectionHolder = new ConnectionHolder({ connectionProvider }) + connectionHolder.initializeConnection() + + const transaction = new Transaction({ + connectionHolder, + onClose: () => { }, + onBookmark: (_: Bookmark) => { }, + onConnection: () => { }, + reactive: false, + fetchSize, + impersonatedUser: "", + highRecordWatermark, + lowRecordWatermark + }) + + return transaction +} + +function newFakeConnection(): FakeConnection { + return new FakeConnection() +} From fca75c4341763bab1b2d638c34aea90f1a57bd1d Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 11 Jan 2022 18:18:42 +0100 Subject: [PATCH 20/26] Add tests in the bolt-protocol level --- .../test/bolt/bolt-protocol-v1.test.js | 22 +++++++++++++++ .../test/bolt/bolt-protocol-v2.test.js | 19 +++++++++++++ .../test/bolt/bolt-protocol-v3.test.js | 21 +++++++++++++++ .../test/bolt/bolt-protocol-v4x0.test.js | 21 +++++++++++++++ .../test/bolt/bolt-protocol-v4x1.test.js | 27 +++++++++++++++++++ .../test/bolt/bolt-protocol-v4x2.test.js | 27 +++++++++++++++++++ .../test/bolt/bolt-protocol-v4x3.test.js | 21 +++++++++++++++ .../test/bolt/bolt-protocol-v4x4.test.js | 21 +++++++++++++++ 8 files changed, 179 insertions(+) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js index 70702c9c9..fd48dce46 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js @@ -338,4 +338,26 @@ describe('#unit BoltProtocolV1', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + mode: WRITE, + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js index c8d5596a2..d62878445 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js @@ -92,4 +92,23 @@ describe('#unit BoltProtocolV2', () => { }) }) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV2(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js index 513886451..78842e96b 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js @@ -295,6 +295,27 @@ describe('#unit BoltProtocolV3', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV3(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) class SpiedBoltProtocolV3 extends BoltProtocolV3 { diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js index 662f220f4..75bcde5d5 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js @@ -214,6 +214,27 @@ describe('#unit BoltProtocolV4x0', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x0(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) class SpiedBoltProtocolV4x0 extends BoltProtocolV4x0 { diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js index c70ed33c4..50164756a 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js @@ -19,6 +19,12 @@ import BoltProtocolV4x1 from '../../src/bolt/bolt-protocol-v4x1' import utils from '../test-utils' +import { internal } from 'neo4j-driver-core' + +const { + txConfig: { TxConfig }, + bookmark: { Bookmark } +} = internal describe('#unit BoltProtocolV4x1', () => { describe('Bolt v4.4', () => { @@ -82,4 +88,25 @@ describe('#unit BoltProtocolV4x1', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x1(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x2.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x2.test.js index b0cbcdde8..982911ccf 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x2.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x2.test.js @@ -19,6 +19,12 @@ import BoltProtocolV4x2 from '../../src/bolt/bolt-protocol-v4x2' import utils from '../test-utils' +import { internal } from 'neo4j-driver-core' + +const { + txConfig: { TxConfig }, + bookmark: { Bookmark } +} = internal describe('#unit BoltProtocolV4x2', () => { describe('Bolt v4.4', () => { @@ -81,4 +87,25 @@ describe('#unit BoltProtocolV4x2', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x2(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x3.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x3.test.js index 41c5f4de1..0845ca616 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x3.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x3.test.js @@ -299,4 +299,25 @@ describe('#unit BoltProtocolV4x3', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x3(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x4.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x4.test.js index 2314922ed..788b88d45 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x4.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x4.test.js @@ -332,4 +332,25 @@ describe('#unit BoltProtocolV4x4', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x4(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) From 89c3dac8fd074e85c0cb9691c6baab9402a2af3d Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 12 Jan 2022 11:34:37 +0100 Subject: [PATCH 21/26] Add docs and improve code design --- packages/core/src/result.ts | 154 ++++++++++++++++++++---------- packages/core/test/result.test.ts | 95 ++++++++++++++++++ 2 files changed, 199 insertions(+), 50 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index b52daa994..7b6e36068 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -87,6 +87,28 @@ interface ResultObserver { onError?: (error: Error) => void } +/** + * Defines the elements in the queue result observer + * @access private + */ + type QueuedResultElement = { + done: false + record: Record +} | { + done: true + summary: ResultSummary +} + +/** + * Defines a ResultObserver interface which can be used to enqueue records and dequeue + * them until the result is fully received. + * @access private + */ + interface QueuedResultObserver extends ResultObserver { + dequeue (): Promise + get size (): number +} + /** * A stream of {@link Record} representing the result of a query. * Can be consumed eagerly as {@link Promise} resolved with array of records and {@link ResultSummary} @@ -211,78 +233,45 @@ class Result implements Promise { return this._p } + /** + * Provides a async iterator over the records in the result. + * + * *Should not be combined with {@link Result#subscribe} or ${@link Result#then} functions.* + * + * @public + * @returns {AsyncIterator} The async iterator for the Results + */ async* [Symbol.asyncIterator](): AsyncIterator { - interface ConsumedValue { - done: boolean - record?: Record - summary?: ResultSummary - } - - interface ResolvablePromise { - promise: Promise - resolve: (arg: T) => any | undefined - reject: (arg: Error) => any | undefined - } - - function createResolvablePromise (): ResolvablePromise { - const resolvablePromise: any = {} - resolvablePromise.promise = new Promise((resolve, reject) => { - resolvablePromise.resolve = resolve - resolvablePromise.reject = reject - }); - return resolvablePromise; - } - - const observer = { - _buffer: [createResolvablePromise()], - onNext: (record: Record) => { - observer._buffer[observer._buffer.length - 1].resolve({ record, done: false }); - observer._buffer.push(createResolvablePromise()); - }, - onCompleted: (summary: ResultSummary) => { - observer._buffer[observer._buffer.length - 1].resolve({ summary, done: true }); - }, - onError: (error: Error) => { - observer._buffer[observer._buffer.length - 1].reject(error); - }, - consume: async () => { - const value = await observer._buffer[0].promise - observer._buffer.shift(); - return value - }, - get queueSize (): number { - return observer._buffer.length - 1 - } - } + const queuedObserver = this._createQueuedResultObserver() const status = { paused: false } let streaming: observer.ResultStreamObserver | null = null try { - streaming = await this._subscribe(observer, true) + streaming = await this._subscribe(queuedObserver, true) } catch (e) { // ignore, we will handle it in consume since the error is notifies in the onError callback } const pullIfNeeded = () => { - if (observer.queueSize >= this._watermarks.high) { + if (queuedObserver.size >= this._watermarks.high) { status.paused = true - } else if (observer.queueSize <= this._watermarks.low) { + } else if (queuedObserver.size <= this._watermarks.low) { status.paused = false } - if (!status.paused && observer.queueSize < this._watermarks.high && streaming) { + if (!status.paused && queuedObserver.size < this._watermarks.high && streaming) { streaming.pull() } } while(true) { pullIfNeeded() - const value = await observer.consume() - if (value.done) { - return value.summary! + const next = await queuedObserver.dequeue() + if (next.done) { + return next.summary } - yield value.record! + yield next.record } } @@ -347,6 +336,15 @@ class Result implements Promise { .catch(() => {}) } + /** + * Stream records to observer as they come in, this is a more efficient method + * of handling the results, and allows you to handle arbitrarily large results. + * + * @access private + * @param {ResultObserver} observer The observer to send records to. + * @param {boolean} explicityPull The flag to indicate if the pull should be called explicitly. + * @returns {Promise} The result stream observer. + */ _subscribe(observer: ResultObserver, explicityPull: boolean = false): Promise { const _observer = this._decorateObserver(observer) @@ -362,6 +360,13 @@ class Result implements Promise { }) } + /** + * Decorates the ResultObserver with the necessary methods. + * + * @access private + * @param {ResultObserver} observer The ResultObserver to decorate. + * @returns The decorated result observer + */ _decorateObserver(observer: ResultObserver): ResultObserver { const onCompletedOriginal = observer.onCompleted || DEFAULT_ON_COMPLETED const onCompletedWrapper = (metadata: any) => { @@ -407,6 +412,11 @@ class Result implements Promise { this._streamObserverPromise.then(o => o.cancel()) } + /** + * @access private + * @param metadata + * @returns + */ private _createSummary(metadata: any): Promise { const { validatedQuery: query, @@ -434,6 +444,50 @@ class Result implements Promise { new ResultSummary(query, parameters, metadata, protocolVersion) ) } + + /** + * @access private + */ + private _createQueuedResultObserver (): QueuedResultObserver { + interface ResolvablePromise { + promise: Promise + resolve: (arg: T) => any | undefined + reject: (arg: Error) => any | undefined + } + + function createResolvablePromise (): ResolvablePromise { + const resolvablePromise: any = {} + resolvablePromise.promise = new Promise((resolve, reject) => { + resolvablePromise.resolve = resolve + resolvablePromise.reject = reject + }); + return resolvablePromise; + } + + const observer = { + _buffer: [createResolvablePromise()], + onNext: (record: Record) => { + observer._buffer[observer._buffer.length - 1].resolve({ done: false, record }); + observer._buffer.push(createResolvablePromise()); + }, + onCompleted: (summary: ResultSummary) => { + observer._buffer[observer._buffer.length - 1].resolve({ done: true, summary }); + }, + onError: (error: Error) => { + observer._buffer[observer._buffer.length - 1].reject(error); + }, + dequeue: async () => { + const value = await observer._buffer[0].promise + observer._buffer.shift(); + return value + }, + get size (): number { + return observer._buffer.length - 1 + } + } + + return observer; + } } function captureStacktrace(): string | null { diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index af51790ae..192d1806a 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -693,6 +693,101 @@ describe('Result', () => { new Record(keys, rawRecord2) ]) }) + + describe('onError', () => { + it('should throws an exception while iterate over records', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const expectedError = new Error('test') + let observedError: Error | undefined + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + const records = [] + + try { + for await (const record of result) { + records.push(record) + streamObserverMock.onError(expectedError) + } + } catch (err) { + observedError = err + } + + expect(observedError).toEqual(expectedError) + }) + + it('should resolve the already received records', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const expectedError = new Error('test') + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + const records = [] + + try { + for await (const record of result) { + records.push(record) + streamObserverMock.onError(expectedError) + } + } catch (err) { + // do nothing + } + + expect(records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2) + ]) + + }) + + it('should throws it when it is the event after onKeys', async () => { + const keys = ['a', 'b'] + const expectedError = new Error('test') + let observedError: Error | undefined + + streamObserverMock.onKeys(keys) + streamObserverMock.onError(expectedError) + + const records = [] + + try { + for await (const record of result) { + records.push(record) + } + } catch (err) { + observedError = err + } + + expect(observedError).toEqual(expectedError) + }) + + it('should throws it when it is the first and unique event', async () => { + const expectedError = new Error('test') + let observedError: Error | undefined + + streamObserverMock.onError(expectedError) + + const records = [] + + try { + for await (const record of result) { + records.push(record) + } + } catch (err) { + observedError = err + } + + expect(observedError).toEqual(expectedError) + }) + }) }) }) From c9f1f8bd99307ea317746c41825f61f4f99f3cf5 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Fri, 14 Jan 2022 11:40:59 +0100 Subject: [PATCH 22/26] Adjusting DenoJS compatibility --- packages/core/src/connection.ts | 2 +- packages/core/src/session.ts | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/packages/core/src/connection.ts b/packages/core/src/connection.ts index d5ffff685..4fb8eb9b4 100644 --- a/packages/core/src/connection.ts +++ b/packages/core/src/connection.ts @@ -17,7 +17,7 @@ * limitations under the License. */ -import { ServerAddress } from "./internal/server-address" +import { ServerAddress } from './internal/server-address' /** * Interface which defines the raw connection with the database diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index c4ff56658..5253abb1b 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -124,7 +124,9 @@ class Session { this._transactionExecutor = _createTransactionExecutor(config) this._onComplete = this._onCompleteCallback.bind(this) this._databaseNameResolved = this._database !== '' - this._setupWatermark() + const calculatedWatermaks = this._calculateWatermaks() + this._lowRecordWatermark = calculatedWatermaks.low + this._highRecordWatermark = calculatedWatermaks.high } /** @@ -431,13 +433,16 @@ class Session { * @private * @returns {void} */ - private _setupWatermark(): void { + private _calculateWatermaks(): { low: number; high: number } { if (this._fetchSize === FETCH_ALL) { - this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull - this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull - } else { - this._lowRecordWatermark = 0.3 * this._fetchSize - this._highRecordWatermark = 0.7 * this._fetchSize + return { + low: Number.MAX_VALUE, // we shall always lower than this number to enable auto pull + high: Number.MAX_VALUE // we shall never reach this number to disable auto pull + } + } + return { + low: 0.3 * this._fetchSize, + high: 0.7 * this._fetchSize } } From 449e51547dccc8729b3b8d8013b1e898d25ac67c Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 19 Jan 2022 12:12:40 +0100 Subject: [PATCH 23/26] Checking if return the promise helps with the timeouts --- packages/testkit-backend/src/backend.js | 4 +-- .../src/controller/interface.js | 2 +- .../testkit-backend/src/controller/local.js | 4 +-- .../testkit-backend/src/controller/remote.js | 2 +- .../testkit-backend/src/request-handlers.js | 28 +++++++++---------- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/packages/testkit-backend/src/backend.js b/packages/testkit-backend/src/backend.js index 80fc63265..f84e432ee 100644 --- a/packages/testkit-backend/src/backend.js +++ b/packages/testkit-backend/src/backend.js @@ -21,9 +21,9 @@ export default class Backend { this._channel.on('contextOpen', ({ contextId }) => this._controller.openContext(contextId)) this._channel.on('contextClose', ({ contextId }) => this._controller.closeContext(contextId)) - this._channel.on('request', ({ contextId, request }) => { + this._channel.on('request', async ({ contextId, request }) => { try { - this._controller.handle(contextId, request) + await this._controller.handle(contextId, request) } catch (e) { this._channel.writeBackendError(contextId, e.message) } diff --git a/packages/testkit-backend/src/controller/interface.js b/packages/testkit-backend/src/controller/interface.js index 3de0cb961..94e4fd9f6 100644 --- a/packages/testkit-backend/src/controller/interface.js +++ b/packages/testkit-backend/src/controller/interface.js @@ -24,7 +24,7 @@ export default class Controller extends EventEmitter { throw new Error('not implemented') } - handle(contextId, request) { + async handle(contextId, request) { throw new Error('not implemented') } } diff --git a/packages/testkit-backend/src/controller/local.js b/packages/testkit-backend/src/controller/local.js index ae2638e51..a4faddaee 100644 --- a/packages/testkit-backend/src/controller/local.js +++ b/packages/testkit-backend/src/controller/local.js @@ -25,7 +25,7 @@ export default class LocalController extends Controller { this._contexts.delete(contextId) } - handle (contextId, { name, data }) { + async handle (contextId, { name, data }) { if (!this._contexts.has(contextId)) { throw new Error(`Context ${contextId} does not exist`) } else if (!(name in this._requestHandlers)) { @@ -34,7 +34,7 @@ export default class LocalController extends Controller { throw new Error(`Unknown request: ${name}`) } - this._requestHandlers[name](this._contexts.get(contextId), data, { + return await this._requestHandlers[name](this._contexts.get(contextId), data, { writeResponse: (name, data) => this._writeResponse(contextId, name, data), writeError: (e) => this._writeError(contextId, e), writeBackendError: (msg) => this._writeBackendError(contextId, msg) diff --git a/packages/testkit-backend/src/controller/remote.js b/packages/testkit-backend/src/controller/remote.js index fc13fedf1..535801b4f 100644 --- a/packages/testkit-backend/src/controller/remote.js +++ b/packages/testkit-backend/src/controller/remote.js @@ -66,7 +66,7 @@ export default class RemoteController extends Controller { this._forwardToConnectedClient('contextClose', contextId, { contextId }) } - handle (contextId, request) { + async handle (contextId, request) { this._forwardToConnectedClient('request', contextId, request) } diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 398b88203..4686d67c5 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -93,13 +93,13 @@ export function NewDriver (context, data, wire) { export function DriverClose (context, data, wire) { const { driverId } = data const driver = context.getDriver(driverId) - driver + return driver .close() .then(() => { wire.writeResponse('Driver', { id: driverId }) }) - .catch(err => wire.writeError(err)) - context.removeDriver(driverId) + .catch(err => wire.writeError(err)) + .finally(() => context.removeDriver(driverId)) } export function NewSession (context, data, wire) { @@ -130,7 +130,7 @@ export function NewSession (context, data, wire) { export function SessionClose (context, data, wire) { const { sessionId } = data const session = context.getSession(sessionId) - session + return session .close() .then(() => { wire.writeResponse('Session', { id: sessionId }) @@ -167,7 +167,7 @@ export function ResultNext (context, data, wire) { if (!("recordIt" in result)) { result.recordIt = result[Symbol.asyncIterator]() } - result.recordIt.next().then(({ value, done }) => { + return result.recordIt.next().then(({ value, done }) => { if (done) { wire.writeResponse('NullRecord', null) } else { @@ -185,7 +185,7 @@ export function ResultNext (context, data, wire) { export function ResultConsume (context, data, wire) { const { resultId } = data const result = context.getResult(resultId) - result.summary().then(summary => { + return result.summary().then(summary => { wire.writeResponse('Summary', { ...summary, serverInfo: { @@ -201,7 +201,7 @@ export function ResultList (context, data, wire) { const result = context.getResult(resultId) - result + return result .then(({ records }) => { const cypherRecords = records.map(rec => { return { values: Array.from(rec.values()).map(nativeToCypher) } @@ -214,7 +214,7 @@ export function ResultList (context, data, wire) { export function SessionReadTransaction (context, data, wire) { const { sessionId, txMeta: metadata } = data const session = context.getSession(sessionId) - session + return session .readTransaction( tx => new Promise((resolve, reject) => { @@ -273,7 +273,7 @@ export function SessionBeginTransaction (context, data, wire) { export function TransactionCommit (context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) - tx.commit() + return tx.commit() .then(() => wire.writeResponse('Transaction', { id })) .catch(e => { console.log('got some err: ' + JSON.stringify(e)) @@ -284,7 +284,7 @@ export function TransactionCommit (context, data, wire) { export function TransactionRollback (context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) - tx.rollback() + return tx.rollback() .then(() => wire.writeResponse('Transaction', { id })) .catch(e => wire.writeError(e)) } @@ -299,7 +299,7 @@ export function SessionLastBookmarks (context, data, wire) { export function SessionWriteTransaction (context, data, wire) { const { sessionId, txMeta: metadata } = data const session = context.getSession(sessionId) - session + return session .writeTransaction( tx => new Promise((resolve, reject) => { @@ -345,7 +345,7 @@ export function GetFeatures (_context, _params, wire) { export function VerifyConnectivity (context, { driverId }, wire) { const driver = context.getDriver(driverId) - driver + return driver .verifyConnectivity() .then(() => wire.writeResponse('Driver', { id: driverId })) .catch(error => wire.writeError(error)) @@ -353,7 +353,7 @@ export function VerifyConnectivity (context, { driverId }, wire) { export function CheckMultiDBSupport (context, { driverId }, wire) { const driver = context.getDriver(driverId) - driver + return driver .supportsMultiDb() .then(available => wire.writeResponse('MultiDBSupport', { id: driverId, available }) @@ -407,7 +407,7 @@ export function ForcedRoutingTableUpdate (context, { driverId, database, bookmar if (provider._freshRoutingTable) { // Removing database from the routing table registry provider._routingTableRegistry._remove(database) - provider._freshRoutingTable ({ + return provider._freshRoutingTable ({ accessMode: 'READ', database, bookmark: bookmarks, From d8f4ca4b633d908566e463f627724b883a6de075 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 24 Jan 2022 10:43:58 +0100 Subject: [PATCH 24/26] Emulating error in a unit test scenario --- packages/core/src/internal/observers.ts | 8 +- packages/core/src/result.ts | 19 ++-- packages/core/test/result.test.ts | 105 +++++++++++++++++++- packages/core/test/utils/connection.fake.ts | 2 +- 4 files changed, 114 insertions(+), 20 deletions(-) diff --git a/packages/core/src/internal/observers.ts b/packages/core/src/internal/observers.ts index a2d5c3766..9a40563a2 100644 --- a/packages/core/src/internal/observers.ts +++ b/packages/core/src/internal/observers.ts @@ -91,7 +91,7 @@ export interface ResultStreamObserver extends StreamObserver { setExplicityPull(explicityPull: boolean): void - pull(): void + pull(): boolean /** * Mark this observer as if it has completed with no metadata. @@ -131,8 +131,9 @@ export class CompletedObserver implements ResultStreamObserver { // do nothing } - pull(): void { + pull(): boolean { // do nothing + return false } onError(error: Error): void { @@ -186,8 +187,9 @@ export class FailedObserver implements ResultStreamObserver { // do nothing } - pull(): void { + pull(): boolean { // do nothing + return false } } diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 7b6e36068..a910d109c 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -246,13 +246,9 @@ class Result implements Promise { const status = { paused: false } - let streaming: observer.ResultStreamObserver | null = null - - try { - streaming = await this._subscribe(queuedObserver, true) - } catch (e) { - // ignore, we will handle it in consume since the error is notifies in the onError callback - } + const streaming: observer.ResultStreamObserver | null = + // the error will be send to the onError callback + await this._subscribe(queuedObserver, true).catch(() => null) const pullIfNeeded = () => { if (queuedObserver.size >= this._watermarks.high) { @@ -466,15 +462,16 @@ class Result implements Promise { const observer = { _buffer: [createResolvablePromise()], + _completedCalls: 0, onNext: (record: Record) => { - observer._buffer[observer._buffer.length - 1].resolve({ done: false, record }); - observer._buffer.push(createResolvablePromise()); + observer._buffer[observer._buffer.length - 1].resolve({ done: false, record }) + observer._buffer.push(createResolvablePromise()) }, onCompleted: (summary: ResultSummary) => { - observer._buffer[observer._buffer.length - 1].resolve({ done: true, summary }); + observer._buffer[observer._buffer.length - 1].resolve({ done: true, summary }) }, onError: (error: Error) => { - observer._buffer[observer._buffer.length - 1].reject(error); + observer._buffer[observer._buffer.length - 1].reject(error) }, dequeue: async () => { const value = await observer._buffer[0].promise diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 192d1806a..139664508 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -19,6 +19,7 @@ import { observer, connectionHolder } from '../src/internal' import { Connection, + internal, newError, Record, ResultObserver, @@ -694,6 +695,56 @@ describe('Result', () => { ]) }) + it('should end full batch', async () => { + const fetchSize = 3 + const observer = new ResultStreamObserverMock() + const res = new Result( + Promise.resolve(observer), + 'query', undefined, undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue: any[][] = [ + rawRecord3, + rawRecord4, + rawRecord5, + rawRecord6 + ] + + jest.spyOn(observer, 'pull') + .mockImplementation(simulatePull(queue, observer, fetchSize, 2)) + + observer.onKeys(keys) + observer.onNext(rawRecord1) + observer.onNext(rawRecord2) + + const records = [] + + for await (const record of res) { + records.push(record) + await new Promise(r => setTimeout(r, 0.1)) + } + + expect(records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2), + new Record(keys, rawRecord3), + new Record(keys, rawRecord4), + new Record(keys, rawRecord5), + new Record(keys, rawRecord6) + ]) + }) + describe('onError', () => { it('should throws an exception while iterate over records', async () => { const keys = ['a', 'b'] @@ -898,14 +949,14 @@ describe('Result', () => { describe.each([ [ 'Promise.resolve(new observer.FailedObserver({ error: expectedError }))', - Promise.resolve(new observer.FailedObserver({ error: expectedError })) + () => Promise.resolve(new observer.FailedObserver({ error: expectedError })) ], - ['Promise.reject(expectedError)', Promise.reject(expectedError)] - ])('new Result(%s, "query") ', (_, promise) => { + ['Promise.reject(expectedError)', () => Promise.reject(expectedError)] + ])('new Result(%s, "query") ', (_, getPromise) => { let result: Result beforeEach(() => { - result = new Result(promise, 'query') + result = new Result(getPromise(), 'query') }) describe('.keys()', () => { @@ -1033,8 +1084,52 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver { // do nothing } - pull(): void { + pull(): boolean { // do nothing + return true + } +} + +function simulatePull( + records: any[][], + observer: ResultStreamObserverMock, + fetchSize: number, + timeout: number = 1): () => boolean { + const state = { + streaming: false, + finished: false, + consumed: 0 + } + return () => { + + if (state.streaming || state.finished) { + return false + } + state.streaming = true + state.consumed = 0 + const interval = setInterval(() => { + state.streaming = state.consumed < fetchSize + state.finished = records.length === 0 + + if (state.finished) { + observer.onCompleted({}) + clearInterval(interval) + return + } + + if (!state.streaming) { + clearInterval(interval) + return + } + + const record = records.shift() + if (record !== undefined) { + observer.onNext(record) + } + state.consumed++ + + }, timeout) + return true } } diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts index 4928a42ac..5548d05fe 100644 --- a/packages/core/test/utils/connection.fake.ts +++ b/packages/core/test/utils/connection.fake.ts @@ -174,7 +174,7 @@ function mockResultStreamObserver(query: string, parameters: any | undefined): R onNext: (result: any) => { }, cancel: () => { }, prepareToHandleSingleResponse: () => { }, - pull: () => { }, + pull: () => { return true }, markCompleted: () => { }, setExplicityPull: (_: boolean) => { }, subscribe: (observer: ResultObserver) => { From 1078485586ff92a5caaa28df5001e088ca4e5ead Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 24 Jan 2022 12:22:02 +0100 Subject: [PATCH 25/26] Changing `pull` for `pause` and `resume` --- .../src/bolt/stream-observers.js | 25 +++-- .../test/bolt/stream-observer.test.js | 85 ++++++++++++---- packages/core/src/internal/observers.ts | 38 +++++--- packages/core/src/result.ts | 25 ++--- packages/core/test/result.test.ts | 96 +++++++++++++------ packages/core/test/utils/connection.fake.ts | 4 +- 6 files changed, 188 insertions(+), 85 deletions(-) diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index bfd086134..b71411ea6 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -100,15 +100,28 @@ class ResultStreamObserver extends StreamObserver { this._highRecordWatermark = highRecordWatermark this._setState(reactive ? _states.READY : _states.READY_STREAMING) this._setupAutoPull() - this._explicityPull = false; + this._paused = false; } - setExplicityPull(explicityPull) { - this._explicityPull = explicityPull; + /** + * Pause the record consuming + * + * This function will supend the record consuming. It will not cancel the stream and the already + * requested records will be sent to the subscriber. + */ + pause () { + this._paused = true } - pull() { - return this._state.pull(this) + /** + * Resume the record consuming + * + * This function will resume the record consuming fetching more records from the server. + */ + resume () { + this._paused = false + this._setupAutoPull(true) + this._state.pull(this) } /** @@ -355,7 +368,7 @@ class ResultStreamObserver extends StreamObserver { _handleStreaming () { if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { - if (!this._explicityPull && (this._discard || this._autoPull)) { + if (!this._paused && (this._discard || this._autoPull)) { this._more() } } diff --git a/packages/bolt-connection/test/bolt/stream-observer.test.js b/packages/bolt-connection/test/bolt/stream-observer.test.js index 6b7d63410..ee018b5fe 100644 --- a/packages/bolt-connection/test/bolt/stream-observer.test.js +++ b/packages/bolt-connection/test/bolt/stream-observer.test.js @@ -199,7 +199,7 @@ describe('#unit ResultStreamObserver', () => { }) }) - describe('when is not explicity pull (default)', () => { + describe('when is not paused (default)', () => { it('should ask for more records when the stream is completed and has more', () => { // Setup const queryId = 123 @@ -229,7 +229,7 @@ describe('#unit ResultStreamObserver', () => { }) }) - describe('when is explicity pull enabled', () => { + describe('when is paused', () => { it('should not ask for more records when the stream is completed and has more', () => { // Setup const queryId = 123 @@ -239,7 +239,8 @@ describe('#unit ResultStreamObserver', () => { moreFunction: more, fetchSize: 2000 }) - streamObserver.setExplicityPull(true) + + streamObserver.pause() // action streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) @@ -254,7 +255,7 @@ describe('#unit ResultStreamObserver', () => { expect(more).toBeCalledTimes(0) }) - describe('pull()', () => { + describe('resume()', () => { it('should ask for more records when the stream is completed and has more', () => { // Setup const queryId = 123 @@ -265,7 +266,8 @@ describe('#unit ResultStreamObserver', () => { moreFunction: more, fetchSize: fetchSize }) - streamObserver.setExplicityPull(true) + + streamObserver.pause() // Scenario streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) @@ -277,7 +279,7 @@ describe('#unit ResultStreamObserver', () => { streamObserver.onCompleted({ has_more: true }) // Action - streamObserver.pull() + streamObserver.resume() // verification expect(more).toBeCalledTimes(1) @@ -295,13 +297,13 @@ describe('#unit ResultStreamObserver', () => { fetchSize: fetchSize, reactive: true }) - streamObserver.setExplicityPull(true) + streamObserver.pause() // Scenario streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) // Action - streamObserver.pull() + streamObserver.resume() // verification expect(more).toBeCalledTimes(1) @@ -320,10 +322,10 @@ describe('#unit ResultStreamObserver', () => { fetchSize: fetchSize, reactive: true }) - streamObserver.setExplicityPull(true) + streamObserver.pause() // Action - streamObserver.pull() + streamObserver.resume() // verification expect(more).toBeCalledTimes(1) @@ -341,13 +343,13 @@ describe('#unit ResultStreamObserver', () => { fetchSize: fetchSize, reactive: false }) - streamObserver.setExplicityPull(true) + streamObserver.pause() // Scenario streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) // Action - streamObserver.pull() + streamObserver.resume() // verification expect(more).toBeCalledTimes(0) @@ -365,13 +367,13 @@ describe('#unit ResultStreamObserver', () => { fetchSize: fetchSize }) - streamObserver.setExplicityPull(true) + streamObserver.pause() // Scenario streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) // Action - streamObserver.pull() + streamObserver.resume() // verification expect(more).toBeCalledTimes(0) @@ -388,7 +390,7 @@ describe('#unit ResultStreamObserver', () => { fetchSize: fetchSize }) - streamObserver.setExplicityPull(true) + streamObserver.pause() // Scenario streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) @@ -399,12 +401,12 @@ describe('#unit ResultStreamObserver', () => { streamObserver.onNext([11, 22, 33]) streamObserver.onCompleted({ has_more: true }) - streamObserver.pull() // should actual call + streamObserver.resume() // should actual call streamObserver.onNext([111, 222, 333]) // Action - streamObserver.pull() + streamObserver.resume() // verification expect(more).toBeCalledTimes(1) @@ -421,7 +423,7 @@ describe('#unit ResultStreamObserver', () => { fetchSize: fetchSize }) - streamObserver.setExplicityPull(true) + streamObserver.pause() // Scenario streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) @@ -433,12 +435,53 @@ describe('#unit ResultStreamObserver', () => { streamObserver.onCompleted({ has_more: false }) // Action - streamObserver.pull() + streamObserver.resume() // verification expect(more).toBeCalledTimes(0) }) + + it('should resume the stream consumption until the end', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + // Action + streamObserver.resume() + + // Streaming until the end + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: false }) + + // verification + expect(more).toBeCalledTimes(3) + }) + it('should not ask for more records when stream failed', () => { // Setup const queryId = 123 @@ -450,7 +493,7 @@ describe('#unit ResultStreamObserver', () => { fetchSize: fetchSize }) - streamObserver.setExplicityPull(true) + streamObserver.pause() // Scenario streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) @@ -461,7 +504,7 @@ describe('#unit ResultStreamObserver', () => { streamObserver.onError(new Error('error')) // Action - streamObserver.pull() + streamObserver.resume() // verification expect(more).toBeCalledTimes(0) diff --git a/packages/core/src/internal/observers.ts b/packages/core/src/internal/observers.ts index 9a40563a2..5ed50a962 100644 --- a/packages/core/src/internal/observers.ts +++ b/packages/core/src/internal/observers.ts @@ -77,6 +77,22 @@ export interface ResultStreamObserver extends StreamObserver { * Cancel pending record stream */ cancel(): void + + /** + * Pause the record consuming + * + * This function will supend the record consuming. It will not cancel the stream and the already + * requested records will be sent to the subscriber. + */ + pause(): void + + /** + * Resume the record consuming + * + * This function will resume the record consuming fetching more records from the server. + */ + resume(): void + /** * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. * Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. @@ -89,10 +105,6 @@ export interface ResultStreamObserver extends StreamObserver { */ prepareToHandleSingleResponse(): void - setExplicityPull(explicityPull: boolean): void - - pull(): boolean - /** * Mark this observer as if it has completed with no metadata. */ @@ -119,21 +131,20 @@ export class CompletedObserver implements ResultStreamObserver { // do nothing } - prepareToHandleSingleResponse(): void { + pause(): void { // do nothing } - markCompleted(): void { + resume(): void { // do nothing } - setExplicityPull(_: boolean): void { + prepareToHandleSingleResponse(): void { // do nothing } - pull(): boolean { + markCompleted(): void { // do nothing - return false } onError(error: Error): void { @@ -175,21 +186,20 @@ export class FailedObserver implements ResultStreamObserver { // do nothing } - prepareToHandleSingleResponse(): void { + pause(): void { // do nothing } - markCompleted(): void { + resume(): void { // do nothing } - setExplicityPull(_: boolean): void { + markCompleted(): void { // do nothing } - pull(): boolean { + prepareToHandleSingleResponse(): void { // do nothing - return false } } diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index a910d109c..81083e602 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -244,25 +244,25 @@ class Result implements Promise { async* [Symbol.asyncIterator](): AsyncIterator { const queuedObserver = this._createQueuedResultObserver() - const status = { paused: false } + const status = { paused: true, firstRun: true } const streaming: observer.ResultStreamObserver | null = // the error will be send to the onError callback await this._subscribe(queuedObserver, true).catch(() => null) - const pullIfNeeded = () => { - if (queuedObserver.size >= this._watermarks.high) { + const controlFlow = () => { + if (queuedObserver.size >= this._watermarks.high && !status.paused) { status.paused = true - } else if (queuedObserver.size <= this._watermarks.low) { + streaming?.pause() + } else if (queuedObserver.size <= this._watermarks.low && status.paused || status.firstRun) { + status.firstRun = false status.paused = false - } - if (!status.paused && queuedObserver.size < this._watermarks.high && streaming) { - streaming.pull() + streaming?.resume() } } while(true) { - pullIfNeeded() + controlFlow() const next = await queuedObserver.dequeue() if (next.done) { return next.summary @@ -338,15 +338,17 @@ class Result implements Promise { * * @access private * @param {ResultObserver} observer The observer to send records to. - * @param {boolean} explicityPull The flag to indicate if the pull should be called explicitly. + * @param {boolean} paused The flag to indicate if the stream should be started paused * @returns {Promise} The result stream observer. */ - _subscribe(observer: ResultObserver, explicityPull: boolean = false): Promise { + _subscribe(observer: ResultObserver, paused: boolean = false): Promise { const _observer = this._decorateObserver(observer) return this._streamObserverPromise .then(o => { - o.setExplicityPull(explicityPull) + if (paused) { + o.pause() + } o.subscribe(_observer) return o }) @@ -462,7 +464,6 @@ class Result implements Promise { const observer = { _buffer: [createResolvablePromise()], - _completedCalls: 0, onNext: (record: Record) => { observer._buffer[observer._buffer.length - 1].resolve({ done: false, record }) observer._buffer.push(createResolvablePromise()) diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 139664508..85de34bb2 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -569,32 +569,36 @@ describe('Result', () => { expect(subscribe).toHaveBeenCalled() }) - it('should set the observer to explicity pull', async () => { - const setExplicityPull = jest.spyOn(streamObserverMock, 'setExplicityPull') + it('should pause the stream and then resume the stream', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') streamObserverMock.onCompleted({}) for await (const _ of result) { // do nothing } - expect(setExplicityPull).toHaveBeenCalledWith(true) + expect(pause).toHaveBeenCalledTimes(1) + expect(resume).toHaveBeenCalledTimes(1) + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(resume.mock.invocationCallOrder[0]) }) - it('should set the observer to explicity pull before subscribe', async () => { + it('should pause the stream before subscribe', async () => { const subscribe = jest.spyOn(streamObserverMock, 'subscribe') - const setExplicityPull = jest.spyOn(streamObserverMock, 'setExplicityPull') + const pause = jest.spyOn(streamObserverMock, 'pause') streamObserverMock.onCompleted({}) for await (const _ of result) { // do nothing } - expect(setExplicityPull.mock.invocationCallOrder[0]) + expect(pause.mock.invocationCallOrder[0]) .toBeLessThan(subscribe.mock.invocationCallOrder[0]) }) - it('should not call pull if queue is bigger than high watermark', async () => { - const pull = jest.spyOn(streamObserverMock, 'pull') + it('should pause the stream if queue is bigger than high watermark', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') streamObserverMock.onKeys(['a']) for (let i = 0; i <= watermarks.high; i++) { @@ -604,11 +608,11 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() await it.next() - expect(pull).toBeCalledTimes(0) + expect(pause).toBeCalledTimes(1) }) - it('should call pull if queue is smaller than low watermark', async () => { - const pull = jest.spyOn(streamObserverMock, 'pull') + it('should call resume if queue is smaller than low watermark', async () => { + const resume = jest.spyOn(streamObserverMock, 'resume') streamObserverMock.onKeys(['a']) for (let i = 0; i < watermarks.low - 1; i++) { @@ -618,11 +622,12 @@ describe('Result', () => { const it = result[Symbol.asyncIterator]() await it.next() - expect(pull).toBeCalledTimes(1) + expect(resume).toBeCalledTimes(1) }) - it('should call pull if queue is between lower and high if it never highter then high watermark', async () => { - const pull = jest.spyOn(streamObserverMock, 'pull') + it('should not pause after resume if queue is between lower and high if it never highter then high watermark', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') streamObserverMock.onKeys(['a']) for (let i = 0; i < watermarks.high - 1; i++) { @@ -634,11 +639,13 @@ describe('Result', () => { await it.next() } - expect(pull).toBeCalledTimes(watermarks.high - watermarks.low) + expect(pause).toBeCalledTimes(1) + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(resume.mock.invocationCallOrder[0]) }) - it('should call pull if queue is between lower and high if it get highter then high watermark', async () => { - const pull = jest.spyOn(streamObserverMock, 'pull') + it('should resume once if queue is between lower and high if it get highter then high watermark', async () => { + const resume = jest.spyOn(streamObserverMock, 'resume') streamObserverMock.onKeys(['a']) for (let i = 0; i < watermarks.high; i++) { @@ -650,11 +657,11 @@ describe('Result', () => { await it.next() } - expect(pull).toBeCalledTimes(0) + expect(resume).toBeCalledTimes(1) }) it('should recover from high watermark limit after went to low watermark', async () => { - const pull = jest.spyOn(streamObserverMock, 'pull') + const resume = jest.spyOn(streamObserverMock, 'resume') streamObserverMock.onKeys(['a']) for (let i = 0; i < watermarks.high; i++) { @@ -670,7 +677,7 @@ describe('Result', () => { await it.next() } - expect(pull).toBeCalledTimes(2) + expect(resume).toBeCalledTimes(1) }) it('should iterate over record', async () => { @@ -720,9 +727,14 @@ describe('Result', () => { rawRecord5, rawRecord6 ] + + const simuatedStream = simulateStream(queue, observer, fetchSize, 2) + + jest.spyOn(observer, 'resume') + .mockImplementation(simuatedStream.resume.bind(simuatedStream)) - jest.spyOn(observer, 'pull') - .mockImplementation(simulatePull(queue, observer, fetchSize, 2)) + jest.spyOn(observer, 'pause') + .mockImplementation(simuatedStream.pause.bind(simuatedStream)) observer.onKeys(keys) observer.onNext(rawRecord1) @@ -1080,33 +1092,37 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver { .forEach(o => o.onCompleted!(meta)) } - setExplicityPull(_: boolean): void { + pause (): void { // do nothing } - pull(): boolean { + resume(): void { // do nothing - return true } } -function simulatePull( +function simulateStream( records: any[][], observer: ResultStreamObserverMock, fetchSize: number, - timeout: number = 1): () => boolean { + timeout: number = 1): { + resume: () => void, + pause: () => void + } { const state = { + paused: false, streaming: false, finished: false, consumed: 0 } - return () => { - + + const streaming = () => { if (state.streaming || state.finished) { - return false + return } state.streaming = true state.consumed = 0 + const interval = setInterval(() => { state.streaming = state.consumed < fetchSize state.finished = records.length === 0 @@ -1119,6 +1135,9 @@ function simulatePull( if (!state.streaming) { clearInterval(interval) + if (!state.paused) { + streaming() + } return } @@ -1129,8 +1148,25 @@ function simulatePull( state.consumed++ }, timeout) + } + + return { + pause: () => { + state.paused = true + }, + resume: () => { + state.paused = false + streaming() + } + } + + /* + return () => { + + return true } + */ } function asConnection(value: any): Connection { diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts index 5548d05fe..2417e3483 100644 --- a/packages/core/test/utils/connection.fake.ts +++ b/packages/core/test/utils/connection.fake.ts @@ -174,9 +174,9 @@ function mockResultStreamObserver(query: string, parameters: any | undefined): R onNext: (result: any) => { }, cancel: () => { }, prepareToHandleSingleResponse: () => { }, - pull: () => { return true }, + pause: () => { }, + resume: () => { }, markCompleted: () => { }, - setExplicityPull: (_: boolean) => { }, subscribe: (observer: ResultObserver) => { if (observer && observer.onCompleted) { observer.onCompleted(new ResultSummary(query, parameters, {})) From bc9e62bf0c1b5f7fdbc8c321da35995bd696254e Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 24 Jan 2022 19:23:28 +0100 Subject: [PATCH 26/26] Treat unhandled promise rejections error This error happens in newer Nodejs versions. --- packages/core/src/result.ts | 47 +++++++++++++++---- .../src/skipped-tests/common.js | 4 ++ 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 81083e602..01bdba701 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -462,29 +462,56 @@ class Result implements Promise { return resolvablePromise; } + type QueuedResultElementOrError = QueuedResultElement | Error + + function isError(elementOrError: QueuedResultElementOrError): elementOrError is Error { + return elementOrError instanceof Error + } + + const buffer: QueuedResultElementOrError[] = [] + const promiseHolder: { resolvable: ResolvablePromise | null } = { resolvable: null } + + const observer = { - _buffer: [createResolvablePromise()], onNext: (record: Record) => { - observer._buffer[observer._buffer.length - 1].resolve({ done: false, record }) - observer._buffer.push(createResolvablePromise()) + observer._push({ done: false, record }) }, onCompleted: (summary: ResultSummary) => { - observer._buffer[observer._buffer.length - 1].resolve({ done: true, summary }) + observer._push({ done: true, summary }) }, onError: (error: Error) => { - observer._buffer[observer._buffer.length - 1].reject(error) + observer._push(error) + }, + _push(element: QueuedResultElementOrError) { + if (promiseHolder.resolvable !== null) { + const resolvable = promiseHolder.resolvable + promiseHolder.resolvable = null + if (isError(element)) { + resolvable.reject(element) + } else { + resolvable.resolve(element) + } + } else { + buffer.push(element) + } }, dequeue: async () => { - const value = await observer._buffer[0].promise - observer._buffer.shift(); - return value + if (buffer.length > 0) { + const element = buffer.shift()! + if (isError(element)) { + throw element + } + return element + } + promiseHolder.resolvable = createResolvablePromise() + return await promiseHolder.resolvable.promise }, get size (): number { - return observer._buffer.length - 1 + return buffer.length } } - return observer; + return observer } } diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 01dcc380f..31a833060 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -7,6 +7,10 @@ const skippedTests = [ ifEquals('neo4j.test_session_run.TestSessionRun.test_session_reuse'), ifEquals('neo4j.test_session_run.TestSessionRun.test_iteration_nested'), ), + skip( + 'Nested calls does not garauntee order in the records pulling', + ifEquals('stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested'), + ), skip( 'The driver has no support domain_name_resolver', ifEndsWith('test_should_successfully_acquire_rt_when_router_ip_changes'),