diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js index d89a98ebb..dec32a488 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v1.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v1.js @@ -278,7 +278,9 @@ export default class BoltProtocol { afterError, beforeComplete, afterComplete, - flush = true + flush = true, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {} ) { const observer = new ResultStreamObserver({ @@ -288,7 +290,9 @@ export default class BoltProtocol { beforeError, afterError, beforeComplete, - afterComplete + afterComplete, + highRecordWatermark, + lowRecordWatermark }) // bookmark and mode are ignored in this version of the protocol diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v3.js b/packages/bolt-connection/src/bolt/bolt-protocol-v3.js index 680ec84b9..2ea0a246e 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v3.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v3.js @@ -163,7 +163,9 @@ export default class BoltProtocol extends BoltProtocolV2 { afterError, beforeComplete, afterComplete, - flush = true + flush = true, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {} ) { const observer = new ResultStreamObserver({ @@ -173,7 +175,9 @@ export default class BoltProtocol extends BoltProtocolV2 { beforeError, afterError, beforeComplete, - afterComplete + afterComplete, + highRecordWatermark, + lowRecordWatermark }) // passing in a database name on this protocol version throws an error diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js b/packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js index dda8a3b42..21802762a 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js @@ -90,7 +90,9 @@ export default class BoltProtocol extends BoltProtocolV3 { afterComplete, flush = true, reactive = false, - fetchSize = FETCH_ALL + fetchSize = FETCH_ALL, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {} ) { const observer = new ResultStreamObserver({ @@ -104,7 +106,9 @@ export default class BoltProtocol extends BoltProtocolV3 { beforeError, afterError, beforeComplete, - afterComplete + afterComplete, + highRecordWatermark, + lowRecordWatermark }) // passing impersonated user on this protocol version throws an error diff --git a/packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js b/packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js index 24eee8ca4..261a879fb 100644 --- a/packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js +++ b/packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js @@ -84,7 +84,9 @@ export default class BoltProtocol extends BoltProtocolV43 { afterComplete, flush = true, reactive = false, - fetchSize = FETCH_ALL + fetchSize = FETCH_ALL, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {} ) { const observer = new ResultStreamObserver({ @@ -98,7 +100,9 @@ export default class BoltProtocol extends BoltProtocolV43 { beforeError, afterError, beforeComplete, - afterComplete + afterComplete, + highRecordWatermark, + lowRecordWatermark }) const flushRun = reactive diff --git a/packages/bolt-connection/src/bolt/stream-observers.js b/packages/bolt-connection/src/bolt/stream-observers.js index 8a2230b59..b71411ea6 100644 --- a/packages/bolt-connection/src/bolt/stream-observers.js +++ b/packages/bolt-connection/src/bolt/stream-observers.js @@ -68,7 +68,9 @@ class ResultStreamObserver extends StreamObserver { afterKeys, beforeComplete, afterComplete, - server + server, + highRecordWatermark = Number.MAX_VALUE, + lowRecordWatermark = Number.MAX_VALUE } = {}) { super() @@ -94,8 +96,32 @@ class ResultStreamObserver extends StreamObserver { this._discardFunction = discardFunction this._discard = false this._fetchSize = fetchSize + this._lowRecordWatermark = lowRecordWatermark + this._highRecordWatermark = highRecordWatermark this._setState(reactive ? _states.READY : _states.READY_STREAMING) - this._setupAuoPull(fetchSize) + this._setupAutoPull() + this._paused = false; + } + + /** + * Pause the record consuming + * + * This function will supend the record consuming. It will not cancel the stream and the already + * requested records will be sent to the subscriber. + */ + pause () { + this._paused = true + } + + /** + * Resume the record consuming + * + * This function will resume the record consuming fetching more records from the server. + */ + resume () { + this._paused = false + this._setupAutoPull(true) + this._state.pull(this) } /** @@ -342,16 +368,21 @@ class ResultStreamObserver extends StreamObserver { _handleStreaming () { if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) { - if (this._discard) { - this._discardFunction(this._queryId, this) - this._setState(_states.STREAMING) - } else if (this._autoPull) { - this._moreFunction(this._queryId, this._fetchSize, this) - this._setState(_states.STREAMING) + if (!this._paused && (this._discard || this._autoPull)) { + this._more() } } } + _more () { + if (this._discard) { + this._discardFunction(this._queryId, this) + } else { + this._moreFunction(this._queryId, this._fetchSize, this) + } + this._setState(_states.STREAMING) + } + _storeMetadataForCompletion (meta) { const keys = Object.keys(meta) let index = keys.length @@ -367,15 +398,8 @@ class ResultStreamObserver extends StreamObserver { this._state = state } - _setupAuoPull (fetchSize) { + _setupAutoPull () { this._autoPull = true - if (fetchSize === FETCH_ALL) { - this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull - this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull - } else { - this._lowRecordWatermark = 0.3 * fetchSize - this._highRecordWatermark = 0.7 * fetchSize - } } } @@ -575,7 +599,8 @@ const _states = { }, name: () => { return 'READY_STREAMING' - } + }, + pull: () => {} }, READY: { // reactive start state @@ -590,7 +615,8 @@ const _states = { }, name: () => { return 'READY' - } + }, + pull: streamObserver => streamObserver._more() }, STREAMING: { onSuccess: (streamObserver, meta) => { @@ -605,7 +631,8 @@ const _states = { }, name: () => { return 'STREAMING' - } + }, + pull: () => {} }, FAILED: { onError: error => { @@ -613,12 +640,14 @@ const _states = { }, name: () => { return 'FAILED' - } + }, + pull: () => {} }, SUCCEEDED: { name: () => { return 'SUCCEEDED' - } + }, + pull: () => {} } } diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js index 70702c9c9..fd48dce46 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js @@ -338,4 +338,26 @@ describe('#unit BoltProtocolV1', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV1(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + mode: WRITE, + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js index c8d5596a2..d62878445 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js @@ -92,4 +92,23 @@ describe('#unit BoltProtocolV2', () => { }) }) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV2(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js index 513886451..78842e96b 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js @@ -295,6 +295,27 @@ describe('#unit BoltProtocolV3', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV3(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) class SpiedBoltProtocolV3 extends BoltProtocolV3 { diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js index 662f220f4..75bcde5d5 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js @@ -214,6 +214,27 @@ describe('#unit BoltProtocolV4x0', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x0(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) class SpiedBoltProtocolV4x0 extends BoltProtocolV4x0 { diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js index c70ed33c4..50164756a 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js @@ -19,6 +19,12 @@ import BoltProtocolV4x1 from '../../src/bolt/bolt-protocol-v4x1' import utils from '../test-utils' +import { internal } from 'neo4j-driver-core' + +const { + txConfig: { TxConfig }, + bookmark: { Bookmark } +} = internal describe('#unit BoltProtocolV4x1', () => { describe('Bolt v4.4', () => { @@ -82,4 +88,25 @@ describe('#unit BoltProtocolV4x1', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x1(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x2.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x2.test.js index b0cbcdde8..982911ccf 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x2.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x2.test.js @@ -19,6 +19,12 @@ import BoltProtocolV4x2 from '../../src/bolt/bolt-protocol-v4x2' import utils from '../test-utils' +import { internal } from 'neo4j-driver-core' + +const { + txConfig: { TxConfig }, + bookmark: { Bookmark } +} = internal describe('#unit BoltProtocolV4x2', () => { describe('Bolt v4.4', () => { @@ -81,4 +87,25 @@ describe('#unit BoltProtocolV4x2', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x2(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x3.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x3.test.js index 41c5f4de1..0845ca616 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x3.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x3.test.js @@ -299,4 +299,25 @@ describe('#unit BoltProtocolV4x3', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x3(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/bolt-protocol-v4x4.test.js b/packages/bolt-connection/test/bolt/bolt-protocol-v4x4.test.js index 2314922ed..788b88d45 100644 --- a/packages/bolt-connection/test/bolt/bolt-protocol-v4x4.test.js +++ b/packages/bolt-connection/test/bolt/bolt-protocol-v4x4.test.js @@ -332,4 +332,25 @@ describe('#unit BoltProtocolV4x4', () => { } ) }) + + describe('watermarks', () => { + it('.run() should configure watermarks', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = utils.spyProtocolWrite( + new BoltProtocolV4x4(recorder, null, false) + ) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + const observer = protocol.run(query, parameters, { + bookmark: Bookmark.empty(), + txConfig: TxConfig.empty(), + lowRecordWatermark: 100, + highRecordWatermark: 200, + }) + + expect(observer._lowRecordWatermark).toEqual(100) + expect(observer._highRecordWatermark).toEqual(200) + }) + }) }) diff --git a/packages/bolt-connection/test/bolt/stream-observer.test.js b/packages/bolt-connection/test/bolt/stream-observer.test.js index dc15ce435..ee018b5fe 100644 --- a/packages/bolt-connection/test/bolt/stream-observer.test.js +++ b/packages/bolt-connection/test/bolt/stream-observer.test.js @@ -198,6 +198,319 @@ describe('#unit ResultStreamObserver', () => { } }) }) + + describe('when is not paused (default)', () => { + it('should ask for more records when the stream is completed and has more', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: 2000 + }) + + // action + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + streamObserver.onNext([111, 222, 333]) + streamObserver.onCompleted({ has_more: false }) + + // verification + expect(more).toBeCalledTimes(1) + expect(more).toBeCalledWith(queryId, fetchSize, streamObserver) + }) + }) + + describe('when is paused', () => { + it('should not ask for more records when the stream is completed and has more', () => { + // Setup + const queryId = 123 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: 2000 + }) + + streamObserver.pause() + + // action + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + // verification + expect(more).toBeCalledTimes(0) + }) + + describe('resume()', () => { + it('should ask for more records when the stream is completed and has more', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + // Action + streamObserver.resume() + + // verification + expect(more).toBeCalledTimes(1) + expect(more).toBeCalledWith(queryId, fetchSize, streamObserver) + }) + + it('should ask for more records when the stream is a new reactive stream', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize, + reactive: true + }) + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + // Action + streamObserver.resume() + + // verification + expect(more).toBeCalledTimes(1) + expect(more).toBeCalledWith(queryId, fetchSize, streamObserver) + }) + + + it('should ask for more records when the stream is a new reactive stream and not run success come yet', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize, + reactive: true + }) + streamObserver.pause() + + // Action + streamObserver.resume() + + // verification + expect(more).toBeCalledTimes(1) + expect(more).toBeCalledWith(null, fetchSize, streamObserver) + }) + + it('should not ask for more records when the stream is a new stream', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize, + reactive: false + }) + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + // Action + streamObserver.resume() + + // verification + expect(more).toBeCalledTimes(0) + }) + + + it('should not ask for more records when the stream is a new stream', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + // Action + streamObserver.resume() + + // verification + expect(more).toBeCalledTimes(0) + }) + + it('should not ask for more records when it is streaming', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + streamObserver.resume() // should actual call + + streamObserver.onNext([111, 222, 333]) + + // Action + streamObserver.resume() + + // verification + expect(more).toBeCalledTimes(1) + }) + + it('should not ask for more records when result is completed', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: false }) + + // Action + streamObserver.resume() + + // verification + expect(more).toBeCalledTimes(0) + }) + + + it('should resume the stream consumption until the end', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + + // Action + streamObserver.resume() + + // Streaming until the end + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: true }) + streamObserver.onNext([1, 2, 3]) + streamObserver.onNext([11, 22, 33]) + streamObserver.onCompleted({ has_more: false }) + + // verification + expect(more).toBeCalledTimes(3) + }) + + it('should not ask for more records when stream failed', () => { + // Setup + const queryId = 123 + const fetchSize = 2000 + + const more = jest.fn() + const streamObserver = new ResultStreamObserver({ + moreFunction: more, + fetchSize: fetchSize + }) + + streamObserver.pause() + + // Scenario + streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId }) + + streamObserver.subscribe(newObserver()) + + streamObserver.onNext([1, 2, 3]) + streamObserver.onError(new Error('error')) + + // Action + streamObserver.resume() + + // verification + expect(more).toBeCalledTimes(0) + }) + }) + }) }) describe('#unit RouteObserver', () => { diff --git a/packages/core/src/connection.ts b/packages/core/src/connection.ts index 3ce6e1f06..4fb8eb9b4 100644 --- a/packages/core/src/connection.ts +++ b/packages/core/src/connection.ts @@ -17,24 +17,38 @@ * limitations under the License. */ +import { ServerAddress } from './internal/server-address' + /** * Interface which defines the raw connection with the database * @private */ class Connection { - id: string = "" - databaseId: string = "" - server: any + get id (): string { + return "" + } + + get databaseId(): string { + return "" + } + + get server(): any { + return {} + } /** * @property {ServerAddress} the server address this connection is opened against */ - address: any + get address(): ServerAddress | undefined { + return undefined + } /** * @property {ServerVersion} the version of the server this connection is connected to */ - version: any + get version(): any { + return undefined + } /** * @returns {boolean} whether this connection is in a working condition diff --git a/packages/core/src/internal/observers.ts b/packages/core/src/internal/observers.ts index 73ffd0f9a..5ed50a962 100644 --- a/packages/core/src/internal/observers.ts +++ b/packages/core/src/internal/observers.ts @@ -77,6 +77,22 @@ export interface ResultStreamObserver extends StreamObserver { * Cancel pending record stream */ cancel(): void + + /** + * Pause the record consuming + * + * This function will supend the record consuming. It will not cancel the stream and the already + * requested records will be sent to the subscriber. + */ + pause(): void + + /** + * Resume the record consuming + * + * This function will resume the record consuming fetching more records from the server. + */ + resume(): void + /** * Stream observer defaults to handling responses for two messages: RUN + PULL_ALL or RUN + DISCARD_ALL. * Response for RUN initializes query keys. Response for PULL_ALL / DISCARD_ALL exposes the result stream. @@ -115,6 +131,14 @@ export class CompletedObserver implements ResultStreamObserver { // do nothing } + pause(): void { + // do nothing + } + + resume(): void { + // do nothing + } + prepareToHandleSingleResponse(): void { // do nothing } @@ -162,13 +186,22 @@ export class FailedObserver implements ResultStreamObserver { // do nothing } - prepareToHandleSingleResponse(): void { + pause(): void { + // do nothing + } + + resume(): void { // do nothing } markCompleted(): void { // do nothing } + + prepareToHandleSingleResponse(): void { + // do nothing + } + } function apply(thisArg: any, func?: (param: T) => void, param?: T): void { diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 2153937c6..01bdba701 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -40,6 +40,13 @@ const DEFAULT_ON_ERROR = (error: Error) => { */ const DEFAULT_ON_COMPLETED = (summary: ResultSummary) => {} +/** + * @private + * @param {string[]} keys List of keys of the record in the result + * @return {void} + */ +const DEFAULT_ON_KEYS = (_keys: string[]) => {} + /** * The query result is the combination of the {@link ResultSummary} and * the array {@link Record[]} produced by the query @@ -80,6 +87,28 @@ interface ResultObserver { onError?: (error: Error) => void } +/** + * Defines the elements in the queue result observer + * @access private + */ + type QueuedResultElement = { + done: false + record: Record +} | { + done: true + summary: ResultSummary +} + +/** + * Defines a ResultObserver interface which can be used to enqueue records and dequeue + * them until the result is fully received. + * @access private + */ + interface QueuedResultObserver extends ResultObserver { + dequeue (): Promise + get size (): number +} + /** * A stream of {@link Record} representing the result of a query. * Can be consumed eagerly as {@link Promise} resolved with array of records and {@link ResultSummary} @@ -94,6 +123,9 @@ class Result implements Promise { private _query: Query private _parameters: any private _connectionHolder: connectionHolder.ConnectionHolder + private _keys: string[] | null + private _summary: ResultSummary | null + private _watermarks: { high: number; low: number } /** * Inject the observer to be used. @@ -108,7 +140,8 @@ class Result implements Promise { streamObserverPromise: Promise, query: Query, parameters?: any, - connectionHolder?: connectionHolder.ConnectionHolder + connectionHolder?: connectionHolder.ConnectionHolder, + watermarks: { high: number; low: number } = { high: Number.MAX_VALUE, low: Number.MAX_VALUE } ) { this._stack = captureStacktrace() this._streamObserverPromise = streamObserverPromise @@ -116,6 +149,9 @@ class Result implements Promise { this._query = query this._parameters = parameters || {} this._connectionHolder = connectionHolder || EMPTY_CONNECTION_HOLDER + this._keys = null + this._summary = null + this._watermarks = watermarks } /** @@ -128,13 +164,16 @@ class Result implements Promise { } */ keys(): Promise { + if (this._keys != null) { + return Promise.resolve(this._keys) + } return new Promise((resolve, reject) => { this._streamObserverPromise .then(observer => - observer.subscribe({ + observer.subscribe(this._decorateObserver({ onKeys: keys => resolve(keys), onError: err => reject(err) - }) + })) ) .catch(reject) }) @@ -150,15 +189,17 @@ class Result implements Promise { * */ summary(): Promise { + if (this._summary != null) { + return Promise.resolve(this._summary) + } return new Promise((resolve, reject) => { this._streamObserverPromise .then(o => { o.cancel() - o.subscribe({ - onCompleted: metadata => - this._createSummary(metadata).then(resolve, reject), + o.subscribe(this._decorateObserver({ + onCompleted: summary => resolve(summary), onError: err => reject(err) - }) + })) }) .catch(reject) }) @@ -192,6 +233,44 @@ class Result implements Promise { return this._p } + /** + * Provides a async iterator over the records in the result. + * + * *Should not be combined with {@link Result#subscribe} or ${@link Result#then} functions.* + * + * @public + * @returns {AsyncIterator} The async iterator for the Results + */ + async* [Symbol.asyncIterator](): AsyncIterator { + const queuedObserver = this._createQueuedResultObserver() + + const status = { paused: true, firstRun: true } + + const streaming: observer.ResultStreamObserver | null = + // the error will be send to the onError callback + await this._subscribe(queuedObserver, true).catch(() => null) + + const controlFlow = () => { + if (queuedObserver.size >= this._watermarks.high && !status.paused) { + status.paused = true + streaming?.pause() + } else if (queuedObserver.size <= this._watermarks.low && status.paused || status.firstRun) { + status.firstRun = false + status.paused = false + streaming?.resume() + } + } + + while(true) { + controlFlow() + const next = await queuedObserver.dequeue() + if (next.done) { + return next.summary + } + yield next.record + } + } + /** * Waits for all results and calls the passed in function with the results. * @@ -249,14 +328,52 @@ class Result implements Promise { * @return {void} */ subscribe(observer: ResultObserver): void { + this._subscribe(observer) + .catch(() => {}) + } + + /** + * Stream records to observer as they come in, this is a more efficient method + * of handling the results, and allows you to handle arbitrarily large results. + * + * @access private + * @param {ResultObserver} observer The observer to send records to. + * @param {boolean} paused The flag to indicate if the stream should be started paused + * @returns {Promise} The result stream observer. + */ + _subscribe(observer: ResultObserver, paused: boolean = false): Promise { + const _observer = this._decorateObserver(observer) + + return this._streamObserverPromise + .then(o => { + if (paused) { + o.pause() + } + o.subscribe(_observer) + return o + }) + .catch(error => { + _observer.onError!(error) + return Promise.reject(error) + }) + } + + /** + * Decorates the ResultObserver with the necessary methods. + * + * @access private + * @param {ResultObserver} observer The ResultObserver to decorate. + * @returns The decorated result observer + */ + _decorateObserver(observer: ResultObserver): ResultObserver { const onCompletedOriginal = observer.onCompleted || DEFAULT_ON_COMPLETED const onCompletedWrapper = (metadata: any) => { - this._createSummary(metadata).then(summary => - onCompletedOriginal.call(observer, summary) - ) - } - observer.onCompleted = onCompletedWrapper + this._createSummary(metadata).then(summary => { + this._summary = summary + return onCompletedOriginal.call(observer, summary) + }) + } const onErrorOriginal = observer.onError || DEFAULT_ON_ERROR const onErrorWrapper = (error: Error) => { @@ -267,13 +384,19 @@ class Result implements Promise { onErrorOriginal.call(observer, error) }) } - observer.onError = onErrorWrapper - this._streamObserverPromise - .then(o => { - return o.subscribe(observer) - }) - .catch(error => observer.onError!(error)) + const onKeysOriginal = observer.onKeys || DEFAULT_ON_KEYS + const onKeysWrapper = (keys: string[]) => { + this._keys = keys + return onKeysOriginal.call(observer, keys) + } + + return { + onNext: observer.onNext? observer.onNext.bind(observer) : undefined, + onKeys: onKeysWrapper, + onCompleted: onCompletedWrapper, + onError: onErrorWrapper + } } /** @@ -287,6 +410,11 @@ class Result implements Promise { this._streamObserverPromise.then(o => o.cancel()) } + /** + * @access private + * @param metadata + * @returns + */ private _createSummary(metadata: any): Promise { const { validatedQuery: query, @@ -314,6 +442,77 @@ class Result implements Promise { new ResultSummary(query, parameters, metadata, protocolVersion) ) } + + /** + * @access private + */ + private _createQueuedResultObserver (): QueuedResultObserver { + interface ResolvablePromise { + promise: Promise + resolve: (arg: T) => any | undefined + reject: (arg: Error) => any | undefined + } + + function createResolvablePromise (): ResolvablePromise { + const resolvablePromise: any = {} + resolvablePromise.promise = new Promise((resolve, reject) => { + resolvablePromise.resolve = resolve + resolvablePromise.reject = reject + }); + return resolvablePromise; + } + + type QueuedResultElementOrError = QueuedResultElement | Error + + function isError(elementOrError: QueuedResultElementOrError): elementOrError is Error { + return elementOrError instanceof Error + } + + const buffer: QueuedResultElementOrError[] = [] + const promiseHolder: { resolvable: ResolvablePromise | null } = { resolvable: null } + + + const observer = { + onNext: (record: Record) => { + observer._push({ done: false, record }) + }, + onCompleted: (summary: ResultSummary) => { + observer._push({ done: true, summary }) + }, + onError: (error: Error) => { + observer._push(error) + }, + _push(element: QueuedResultElementOrError) { + if (promiseHolder.resolvable !== null) { + const resolvable = promiseHolder.resolvable + promiseHolder.resolvable = null + if (isError(element)) { + resolvable.reject(element) + } else { + resolvable.resolve(element) + } + } else { + buffer.push(element) + } + }, + dequeue: async () => { + if (buffer.length > 0) { + const element = buffer.shift()! + if (isError(element)) { + throw element + } + return element + } + promiseHolder.resolvable = createResolvablePromise() + return await promiseHolder.resolvable.promise + }, + get size (): number { + return buffer.length + } + } + + return observer + } } function captureStacktrace(): string | null { diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 2597a140c..5253abb1b 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -18,6 +18,7 @@ */ import { ResultStreamObserver, FailedObserver } from './internal/observers' import { validateQueryAndParameters } from './internal/util' +import { FETCH_ALL } from './internal/constants' import { newError } from './error' import Result from './result' import Transaction from './transaction' @@ -60,6 +61,8 @@ class Session { private _impersonatedUser?: string private _onComplete: (meta: any) => void private _databaseNameResolved: boolean + private _lowRecordWatermark: number + private _highRecordWatermark: number /** * @constructor @@ -121,6 +124,9 @@ class Session { this._transactionExecutor = _createTransactionExecutor(config) this._onComplete = this._onCompleteCallback.bind(this) this._databaseNameResolved = this._database !== '' + const calculatedWatermaks = this._calculateWatermaks() + this._lowRecordWatermark = calculatedWatermaks.low + this._highRecordWatermark = calculatedWatermaks.high } /** @@ -157,7 +163,9 @@ class Session { impersonatedUser: this._impersonatedUser, afterComplete: this._onComplete, reactive: this._reactive, - fetchSize: this._fetchSize + fetchSize: this._fetchSize, + lowRecordWatermark: this._lowRecordWatermark, + highRecordWatermark: this._highRecordWatermark }) }) } @@ -192,7 +200,8 @@ class Session { }) ) } - return new Result(observerPromise, query, parameters, connectionHolder) + const watermarks = { high: this._highRecordWatermark, low: this._lowRecordWatermark } + return new Result(observerPromise, query, parameters, connectionHolder, watermarks) } async _acquireConnection(connectionConsumer: ConnectionConsumer) { @@ -269,7 +278,9 @@ class Session { onBookmark: this._updateBookmark.bind(this), onConnection: this._assertSessionIsOpen.bind(this), reactive: this._reactive, - fetchSize: this._fetchSize + fetchSize: this._fetchSize, + lowRecordWatermark: this._lowRecordWatermark, + highRecordWatermark: this._highRecordWatermark }) tx._begin(this._lastBookmark, txConfig) return tx @@ -418,6 +429,23 @@ class Session { this._updateBookmark(new Bookmark(meta.bookmark)) } + /** + * @private + * @returns {void} + */ + private _calculateWatermaks(): { low: number; high: number } { + if (this._fetchSize === FETCH_ALL) { + return { + low: Number.MAX_VALUE, // we shall always lower than this number to enable auto pull + high: Number.MAX_VALUE // we shall never reach this number to disable auto pull + } + } + return { + low: 0.3 * this._fetchSize, + high: 0.7 * this._fetchSize + } + } + /** * @protected */ diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index f48a63a11..5c11262a1 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -53,6 +53,8 @@ class Transaction { private _fetchSize: number private _results: any[] private _impersonatedUser?: string + private _lowRecordWatermak: number + private _highRecordWatermark: number /** * @constructor @@ -72,7 +74,9 @@ class Transaction { onConnection, reactive, fetchSize, - impersonatedUser + impersonatedUser, + highRecordWatermark, + lowRecordWatermark }: { connectionHolder: ConnectionHolder onClose: () => void @@ -80,7 +84,9 @@ class Transaction { onConnection: () => void reactive: boolean fetchSize: number - impersonatedUser?: string + impersonatedUser?: string, + highRecordWatermark: number, + lowRecordWatermark: number }) { this._connectionHolder = connectionHolder this._reactive = reactive @@ -93,6 +99,8 @@ class Transaction { this._fetchSize = fetchSize this._results = [] this._impersonatedUser = impersonatedUser + this._lowRecordWatermak = lowRecordWatermark + this._highRecordWatermark = highRecordWatermark } /** @@ -143,7 +151,9 @@ class Transaction { onComplete: this._onComplete, onConnection: this._onConnection, reactive: this._reactive, - fetchSize: this._fetchSize + fetchSize: this._fetchSize, + highRecordWatermark: this._highRecordWatermark, + lowRecordWatermark: this._lowRecordWatermak }) this._results.push(result) return result @@ -243,6 +253,8 @@ interface StateTransitionParams { pendingResults: any[] reactive: boolean fetchSize: number + highRecordWatermark: number + lowRecordWatermark: number } const _states = { @@ -296,6 +308,8 @@ const _states = { onConnection, reactive, fetchSize, + highRecordWatermark, + lowRecordWatermark }: StateTransitionParams ): any => { // RUN in explicit transaction can't contain bookmarks and transaction configuration @@ -312,6 +326,8 @@ const _states = { afterComplete: onComplete, reactive: reactive, fetchSize: fetchSize, + highRecordWatermark: highRecordWatermark, + lowRecordWatermark: lowRecordWatermark }) } else { throw newError('No connection available') @@ -323,7 +339,9 @@ const _states = { observerPromise, query, parameters, - connectionHolder + connectionHolder, + highRecordWatermark, + lowRecordWatermark ) } }, @@ -346,7 +364,9 @@ const _states = { }), 'COMMIT', {}, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ), state: _states.FAILED } @@ -361,7 +381,9 @@ const _states = { new CompletedObserver(), 'ROLLBACK', {}, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ), state: _states.FAILED } @@ -380,7 +402,9 @@ const _states = { }), query, parameters, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ) } }, @@ -401,7 +425,10 @@ const _states = { onError }), 'COMMIT', - {} + {}, + EMPTY_CONNECTION_HOLDER, + 0, // high watermark + 0 // low watermark ), state: _states.SUCCEEDED, connectionHolder @@ -421,7 +448,10 @@ const _states = { onError }), 'ROLLBACK', - {} + {}, + EMPTY_CONNECTION_HOLDER, + 0, // high watermark + 0 // low watermark ), state: _states.SUCCEEDED, connectionHolder @@ -441,7 +471,9 @@ const _states = { }), query, parameters, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ) } }, @@ -463,7 +495,9 @@ const _states = { }), 'COMMIT', {}, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ), state: _states.ROLLED_BACK } @@ -482,7 +516,9 @@ const _states = { }), 'ROLLBACK', {}, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ), state: _states.ROLLED_BACK } @@ -501,7 +537,9 @@ const _states = { }), query, parameters, - connectionHolder + connectionHolder, + 0, // high watermark + 0 // low watermark ) } } @@ -555,7 +593,11 @@ function finishTransaction( observerPromise, commit ? 'COMMIT' : 'ROLLBACK', {}, - connectionHolder + connectionHolder, + { + high: Number.MAX_VALUE, + low: Number.MAX_VALUE + }, ) } @@ -574,13 +616,19 @@ function newCompletedResult( observerPromise: ResultStreamObserver | Promise, query: Query, parameters: any, - connectionHolder: ConnectionHolder = EMPTY_CONNECTION_HOLDER + connectionHolder: ConnectionHolder = EMPTY_CONNECTION_HOLDER, + highRecordWatermark: number, + lowRecordWatermark: number ): Result { return new Result( Promise.resolve(observerPromise), query, parameters, - new ReadOnlyConnectionHolder(connectionHolder || EMPTY_CONNECTION_HOLDER) + new ReadOnlyConnectionHolder(connectionHolder || EMPTY_CONNECTION_HOLDER), + { + low: lowRecordWatermark, + high: highRecordWatermark + } ) } diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index c485eaf10..85de34bb2 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -19,6 +19,7 @@ import { observer, connectionHolder } from '../src/internal' import { Connection, + internal, newError, Record, ResultObserver, @@ -33,10 +34,11 @@ describe('Result', () => { describe('new Result(Promise.resolve(new ResultStreamObserverMock()), query, parameters, connectionHolder)', () => { let streamObserverMock: ResultStreamObserverMock let result: Result + const watermarks = { high: 10, low: 3 } beforeEach(() => { streamObserverMock = new ResultStreamObserverMock() - result = new Result(Promise.resolve(streamObserverMock), 'query') + result = new Result(Promise.resolve(streamObserverMock), 'query', undefined, undefined, watermarks) }) describe('.keys()', () => { @@ -554,6 +556,302 @@ describe('Result', () => { ) }) }) + + describe('asyncIterator', () => { + it('should subscribe to the observer', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + streamObserverMock.onCompleted({}) + + for await (const _ of result) { + // do nothing + } + + expect(subscribe).toHaveBeenCalled() + }) + + it('should pause the stream and then resume the stream', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onCompleted({}) + + for await (const _ of result) { + // do nothing + } + + expect(pause).toHaveBeenCalledTimes(1) + expect(resume).toHaveBeenCalledTimes(1) + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(resume.mock.invocationCallOrder[0]) + }) + + it('should pause the stream before subscribe', async () => { + const subscribe = jest.spyOn(streamObserverMock, 'subscribe') + const pause = jest.spyOn(streamObserverMock, 'pause') + streamObserverMock.onCompleted({}) + + for await (const _ of result) { + // do nothing + } + + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(subscribe.mock.invocationCallOrder[0]) + }) + + it('should pause the stream if queue is bigger than high watermark', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i <= watermarks.high; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + await it.next() + + expect(pause).toBeCalledTimes(1) + }) + + it('should call resume if queue is smaller than low watermark', async () => { + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.low - 1; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + await it.next() + + expect(resume).toBeCalledTimes(1) + }) + + it('should not pause after resume if queue is between lower and high if it never highter then high watermark', async () => { + const pause = jest.spyOn(streamObserverMock, 'pause') + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.high - 1; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + for (let i = 0; i < watermarks.high - watermarks.low; i++) { + await it.next() + } + + expect(pause).toBeCalledTimes(1) + expect(pause.mock.invocationCallOrder[0]) + .toBeLessThan(resume.mock.invocationCallOrder[0]) + }) + + it('should resume once if queue is between lower and high if it get highter then high watermark', async () => { + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.high; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + for (let i = 0; i < watermarks.high - watermarks.low; i++) { + await it.next() + } + + expect(resume).toBeCalledTimes(1) + }) + + it('should recover from high watermark limit after went to low watermark', async () => { + const resume = jest.spyOn(streamObserverMock, 'resume') + streamObserverMock.onKeys(['a']) + + for (let i = 0; i < watermarks.high; i++) { + streamObserverMock.onNext([i]) + } + + const it = result[Symbol.asyncIterator]() + for (let i = 0; i < watermarks.high - watermarks.low; i++) { + await it.next() + } + + for (let i = 0; i < 2; i++) { + await it.next() + } + + expect(resume).toBeCalledTimes(1) + }) + + it('should iterate over record', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + streamObserverMock.onCompleted({}) + + const records = [] + for await (const record of result) { + records.push(record) + } + + expect(records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2) + ]) + }) + + it('should end full batch', async () => { + const fetchSize = 3 + const observer = new ResultStreamObserverMock() + const res = new Result( + Promise.resolve(observer), + 'query', undefined, undefined, + { + low: fetchSize * 0.3, // Same as calculate in the session.ts + high: fetchSize * 0.7 + } + ) + + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const rawRecord3 = [5, 6] + const rawRecord4 = [7, 8] + const rawRecord5 = [9, 10] + const rawRecord6 = [11, 12] + const queue: any[][] = [ + rawRecord3, + rawRecord4, + rawRecord5, + rawRecord6 + ] + + const simuatedStream = simulateStream(queue, observer, fetchSize, 2) + + jest.spyOn(observer, 'resume') + .mockImplementation(simuatedStream.resume.bind(simuatedStream)) + + jest.spyOn(observer, 'pause') + .mockImplementation(simuatedStream.pause.bind(simuatedStream)) + + observer.onKeys(keys) + observer.onNext(rawRecord1) + observer.onNext(rawRecord2) + + const records = [] + + for await (const record of res) { + records.push(record) + await new Promise(r => setTimeout(r, 0.1)) + } + + expect(records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2), + new Record(keys, rawRecord3), + new Record(keys, rawRecord4), + new Record(keys, rawRecord5), + new Record(keys, rawRecord6) + ]) + }) + + describe('onError', () => { + it('should throws an exception while iterate over records', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const expectedError = new Error('test') + let observedError: Error | undefined + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + const records = [] + + try { + for await (const record of result) { + records.push(record) + streamObserverMock.onError(expectedError) + } + } catch (err) { + observedError = err + } + + expect(observedError).toEqual(expectedError) + }) + + it('should resolve the already received records', async () => { + const keys = ['a', 'b'] + const rawRecord1 = [1, 2] + const rawRecord2 = [3, 4] + const expectedError = new Error('test') + + streamObserverMock.onKeys(keys) + streamObserverMock.onNext(rawRecord1) + streamObserverMock.onNext(rawRecord2) + + const records = [] + + try { + for await (const record of result) { + records.push(record) + streamObserverMock.onError(expectedError) + } + } catch (err) { + // do nothing + } + + expect(records).toEqual([ + new Record(keys, rawRecord1), + new Record(keys, rawRecord2) + ]) + + }) + + it('should throws it when it is the event after onKeys', async () => { + const keys = ['a', 'b'] + const expectedError = new Error('test') + let observedError: Error | undefined + + streamObserverMock.onKeys(keys) + streamObserverMock.onError(expectedError) + + const records = [] + + try { + for await (const record of result) { + records.push(record) + } + } catch (err) { + observedError = err + } + + expect(observedError).toEqual(expectedError) + }) + + it('should throws it when it is the first and unique event', async () => { + const expectedError = new Error('test') + let observedError: Error | undefined + + streamObserverMock.onError(expectedError) + + const records = [] + + try { + for await (const record of result) { + records.push(record) + } + } catch (err) { + observedError = err + } + + expect(observedError).toEqual(expectedError) + }) + }) + }) }) describe.each([ @@ -638,19 +936,39 @@ describe('Result', () => { expect(Object.keys(queryResult).sort()).toEqual(['records', 'summary']) }) }) + + describe('asyncIterator', () => { + it('should be resolved with an empty array of records', async () => { + const records = [] + + for await (const record of result) { + records.push(record) + } + + expect(records).toStrictEqual([]) + }) + + it('should be resolved with expected result summary', async () => { + const it = result[Symbol.asyncIterator]() + const next = await it.next() + + expect(next.done).toBe(true) + expect(next.value).toStrictEqual(expectedResultSummary) + }) + }) }) describe.each([ [ 'Promise.resolve(new observer.FailedObserver({ error: expectedError }))', - Promise.resolve(new observer.FailedObserver({ error: expectedError })) + () => Promise.resolve(new observer.FailedObserver({ error: expectedError })) ], - ['Promise.reject(expectedError)', Promise.reject(expectedError)] - ])('new Result(%s, "query") ', (_, promise) => { + ['Promise.reject(expectedError)', () => Promise.reject(expectedError)] + ])('new Result(%s, "query") ', (_, getPromise) => { let result: Result beforeEach(() => { - result = new Result(promise, 'query') + result = new Result(getPromise(), 'query') }) describe('.keys()', () => { @@ -685,6 +1003,14 @@ describe('Result', () => { }) }) + describe('asyncIterator', () => { + shouldReturnRejectedPromiseWithTheExpectedError(async () => { + for await (const _ of result) { + // do nothing + } + }) + }) + function shouldReturnRejectedPromiseWithTheExpectedError( supplier: () => Promise ) { @@ -765,6 +1091,82 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver { .filter(o => o.onCompleted) .forEach(o => o.onCompleted!(meta)) } + + pause (): void { + // do nothing + } + + resume(): void { + // do nothing + } +} + +function simulateStream( + records: any[][], + observer: ResultStreamObserverMock, + fetchSize: number, + timeout: number = 1): { + resume: () => void, + pause: () => void + } { + const state = { + paused: false, + streaming: false, + finished: false, + consumed: 0 + } + + const streaming = () => { + if (state.streaming || state.finished) { + return + } + state.streaming = true + state.consumed = 0 + + const interval = setInterval(() => { + state.streaming = state.consumed < fetchSize + state.finished = records.length === 0 + + if (state.finished) { + observer.onCompleted({}) + clearInterval(interval) + return + } + + if (!state.streaming) { + clearInterval(interval) + if (!state.paused) { + streaming() + } + return + } + + const record = records.shift() + if (record !== undefined) { + observer.onNext(record) + } + state.consumed++ + + }, timeout) + } + + return { + pause: () => { + state.paused = true + }, + resume: () => { + state.paused = false + streaming() + } + } + + /* + return () => { + + + return true + } + */ } function asConnection(value: any): Connection { diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts new file mode 100644 index 000000000..8bdd36921 --- /dev/null +++ b/packages/core/test/session.test.ts @@ -0,0 +1,229 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { ConnectionProvider, Session, Connection } from '../src' +import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants' +import FakeConnection from './utils/connection.fake' + +describe('session', () => { + it('close should return promise', done => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection) + + session.close().then(() => done()) + }, 70000) + + it('close should return promise even when already closed ', done => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection) + + session.close().then(() => { + session.close().then(() => { + session.close().then(() => { + done() + }) + }) + }) + }, 70000) + + it('run should send watermarks to Result when fetchsize if defined', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, 1000) + + const result = session.run('RETURN 1') + await result; + + expect(connection.seenProtocolOptions[0]).toMatchObject({ + fetchSize: 1000, + lowRecordWatermark: 300, + highRecordWatermark: 700 + }) + // @ts-ignore + expect(result._watermarks).toEqual({ high: 700, low: 300 }) + }) + + it('run should send watermarks to Result when fetchsize is fetch all', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, FETCH_ALL) + + const result = session.run('RETURN 1') + await result; + + expect(connection.seenProtocolOptions[0]).toMatchObject({ + fetchSize: FETCH_ALL, + lowRecordWatermark: Number.MAX_VALUE, + highRecordWatermark: Number.MAX_VALUE + }) + + // @ts-ignore + expect(result._watermarks).toEqual({ high: Number.MAX_VALUE, low: Number.MAX_VALUE }) + }) + + it('run should send watermarks to Transaction when fetchsize if defined (begin)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, 1000) + + const tx = session.beginTransaction() + + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(300) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(700) + }) + + it('run should send watermarks to Transaction when fetchsize is fetch all (begin)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, FETCH_ALL) + + + const tx = session.beginTransaction() + + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(Number.MAX_VALUE) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(Number.MAX_VALUE) + }) + + it('run should send watermarks to Transaction when fetchsize if defined (writeTransaction)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, 1000) + const status = { functionCalled: false } + + await session.writeTransaction(tx => { + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(300) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(700) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + + it('run should send watermarks to Transaction when fetchsize is fetch all (writeTransaction)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, FETCH_ALL) + const status = { functionCalled: false } + + await session.writeTransaction(tx => { + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(Number.MAX_VALUE) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(Number.MAX_VALUE) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + + it('run should send watermarks to Transaction when fetchsize if defined (readTransaction)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, 1000) + const status = { functionCalled: false } + + await session.readTransaction(tx => { + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(300) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(700) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + + it('run should send watermarks to Transaction when fetchsize is fetch all (readTransaction)', async () => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection, false, FETCH_ALL) + const status = { functionCalled: false } + + await session.readTransaction(tx => { + // @ts-ignore + expect(tx._lowRecordWatermak).toEqual(Number.MAX_VALUE) + // @ts-ignore + expect(tx._highRecordWatermark).toEqual(Number.MAX_VALUE) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + + it('close should be idempotent ', done => { + const connection = newFakeConnection() + const session = newSessionWithConnection(connection) + + session.close().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy() + + session.close().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy() + + session.close().then(() => { + expect(connection.isReleasedOnce()).toBeTruthy() + done() + }) + }) + }) + }, 70000) + + it('should close transaction executor', done => { + const session = newSessionWithConnection(newFakeConnection()) + + let closeCalledTimes = 0 + // @ts-ignore + const transactionExecutor = session._transactionExecutor + const originalClose = transactionExecutor.close + transactionExecutor.close = () => { + closeCalledTimes++ + originalClose.call(transactionExecutor) + } + + session.close().then(() => { + expect(closeCalledTimes).toEqual(1) + done() + }) + }, 70000) +}) + +function newSessionWithConnection(connection: Connection, beginTx: boolean = true, fetchSize: number = 1000): Session { + const connectionProvider = new ConnectionProvider() + connectionProvider.acquireConnection = () => Promise.resolve(connection) + connectionProvider.close = () => Promise.resolve() + + const session = new Session({ + mode: ACCESS_MODE_READ, + connectionProvider, + database: "", + fetchSize, + config: {}, + reactive: false + }) + + if (beginTx) { + session.beginTransaction() // force session to acquire new connection + } + return session +} + +function newFakeConnection(): FakeConnection { + return new FakeConnection() +} diff --git a/packages/core/test/transaction.test.ts b/packages/core/test/transaction.test.ts new file mode 100644 index 000000000..d4a8546dc --- /dev/null +++ b/packages/core/test/transaction.test.ts @@ -0,0 +1,100 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ConnectionProvider, Transaction } from "../src"; +import { Bookmark } from "../src/internal/bookmark"; +import { ConnectionHolder } from "../src/internal/connection-holder"; +import FakeConnection from "./utils/connection.fake"; + +describe('Transaction', () => { + + describe('.run()', () => { + it('should call run with watermarks', async () => { + const connection = newFakeConnection() + const tx = newTransaction({ + connection, + fetchSize: 1000, + highRecordWatermark: 700, + lowRecordWatermark: 300 + }) + + await tx.run('RETURN 1') + + expect(connection.seenProtocolOptions[0]).toMatchObject({ + fetchSize: 1000, + lowRecordWatermark: 300, + highRecordWatermark: 700 + }) + }) + + it('should configure result with watermarks', async () => { + const connection = newFakeConnection() + const tx = newTransaction({ + connection, + fetchSize: 1000, + highRecordWatermark: 700, + lowRecordWatermark: 300 + }) + + var result = tx.run('RETURN 1') + + // @ts-ignore + expect(result._watermarks).toEqual({ high: 700, low: 300 }) + }) + + }) + +}) + +function newTransaction({ + connection, + fetchSize = 1000, + highRecordWatermark = 700, + lowRecordWatermark = 300 +}: { + connection: FakeConnection + fetchSize: number + highRecordWatermark: number, + lowRecordWatermark: number +}): Transaction { + const connectionProvider = new ConnectionProvider() + connectionProvider.acquireConnection = () => Promise.resolve(connection) + connectionProvider.close = () => Promise.resolve() + + const connectionHolder = new ConnectionHolder({ connectionProvider }) + connectionHolder.initializeConnection() + + const transaction = new Transaction({ + connectionHolder, + onClose: () => { }, + onBookmark: (_: Bookmark) => { }, + onConnection: () => { }, + reactive: false, + fetchSize, + impersonatedUser: "", + highRecordWatermark, + lowRecordWatermark + }) + + return transaction +} + +function newFakeConnection(): FakeConnection { + return new FakeConnection() +} diff --git a/packages/core/test/utils/connection.fake.ts b/packages/core/test/utils/connection.fake.ts new file mode 100644 index 000000000..2417e3483 --- /dev/null +++ b/packages/core/test/utils/connection.fake.ts @@ -0,0 +1,187 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Connection, ResultObserver, Record, ResultSummary } from '../../src' +import { ResultStreamObserver } from '../../src/internal/observers' + + +/** + * This class is like a mock of {@link Connection} that tracks invocations count. + * It tries to maintain same "interface" as {@link Connection}. + * It could be replaced with a proper mock by a library like testdouble. + * At the time of writing such libraries require {@link Proxy} support but browser tests execute in + * PhantomJS which does not support proxies. + */ +export default class FakeConnection extends Connection { + private _open: boolean + private _id: number + private _databaseId: string | null + private _requestRoutingInformationMock: ((params: any) => void) | null + public creationTimestamp: number + public resetInvoked: number + public releaseInvoked: number + public seenQueries: string[] + public seenParameters: any[] + public seenProtocolOptions: any[] + private _server: any + public protocolVersion: number | undefined + public protocolErrorsHandled: number + public seenProtocolErrors: string[] + public seenRequestRoutingInformation: any[] + + constructor() { + super() + + this._open = true + this._id = 0 + this._databaseId = null + this._requestRoutingInformationMock = null + this.creationTimestamp = Date.now() + + this.resetInvoked = 0 + this.releaseInvoked = 0 + this.seenQueries = [] + this.seenParameters = [] + this.seenProtocolOptions = [] + this._server = {} + this.protocolVersion = undefined + this.protocolErrorsHandled = 0 + this.seenProtocolErrors = [] + this.seenRequestRoutingInformation = [] + } + + get id(): string { + return this._id.toString() + } + + get databaseId(): string { + return this._databaseId!! + } + + set databaseId(value) { + this._databaseId = value + } + + get server() { + return this._server + } + + get version() { + return this._server.version + } + + set version(value) { + this._server.version = value + } + + protocol() { + // return fake protocol object that simply records seen queries and parameters + return { + run: (query: string, parameters: any | undefined, protocolOptions: any | undefined): ResultStreamObserver => { + this.seenQueries.push(query) + this.seenParameters.push(parameters) + this.seenProtocolOptions.push(protocolOptions) + return mockResultStreamObserver(query, parameters) + }, + commitTransaction: () => { + return mockResultStreamObserver('COMMIT', {}) + }, + beginTransaction: () => { + return Promise.resolve() + }, + requestRoutingInformation: (params: any | undefined) => { + this.seenRequestRoutingInformation.push(params) + if (this._requestRoutingInformationMock) { + this._requestRoutingInformationMock(params) + } + }, + version: this.protocolVersion + } + } + + resetAndFlush() { + this.resetInvoked++ + return Promise.resolve() + } + + _release() { + this.releaseInvoked++ + return Promise.resolve() + } + + isOpen() { + return this._open + } + + isNeverReleased() { + return this.isReleasedTimes(0) + } + + isReleasedOnce() { + return this.isReleasedTimes(1) + } + + isReleasedTimes(times: number) { + return this.resetInvoked === times && this.releaseInvoked === times + } + + _handleProtocolError(message: string) { + this.protocolErrorsHandled++ + this.seenProtocolErrors.push(message) + } + + withProtocolVersion(version: number) { + this.protocolVersion = version + return this + } + + withCreationTimestamp(value: number) { + this.creationTimestamp = value + return this + } + + withRequestRoutingInformationMock(requestRoutingInformationMock: (params: any) => void) { + this._requestRoutingInformationMock = requestRoutingInformationMock + return this + } + + closed() { + this._open = false + return this + } +} + +function mockResultStreamObserver(query: string, parameters: any | undefined): ResultStreamObserver { + return { + onError: (error: any) => { }, + onCompleted: () => { }, + onNext: (result: any) => { }, + cancel: () => { }, + prepareToHandleSingleResponse: () => { }, + pause: () => { }, + resume: () => { }, + markCompleted: () => { }, + subscribe: (observer: ResultObserver) => { + if (observer && observer.onCompleted) { + observer.onCompleted(new ResultSummary(query, parameters, {})) + } + + } + } +} diff --git a/packages/neo4j-driver/gulpfile.babel.js b/packages/neo4j-driver/gulpfile.babel.js index b531c609b..f368a2f9e 100644 --- a/packages/neo4j-driver/gulpfile.babel.js +++ b/packages/neo4j-driver/gulpfile.babel.js @@ -203,6 +203,7 @@ gulp.task('run-ts-declaration-tests', function (done) { .src(['test/types/**/*', 'types/**/*'], { base: '.' }) .pipe( ts({ + lib: ['es6', 'dom', 'esnext.asynciterable'], module: 'es6', target: 'es6', noImplicitAny: true, diff --git a/packages/neo4j-driver/test/examples.test.js b/packages/neo4j-driver/test/examples.test.js index 5ed785bcb..f0c42dd94 100644 --- a/packages/neo4j-driver/test/examples.test.js +++ b/packages/neo4j-driver/test/examples.test.js @@ -1511,6 +1511,24 @@ describe('#integration examples', () => { }) } }) + + describe('async interators', () => { + it('should iterate over the values', async () => { + const driver = driverGlobal + + const session = driver.session() + try { + const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x') + const xs = [] + for await (const record of result) { + xs.push(record.get('x').toInt()) + } + expect(xs).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + } finally { + await session.close() + } + }) + }) }) function removeLineBreaks (string) { diff --git a/packages/neo4j-driver/test/session.test.js b/packages/neo4j-driver/test/session.test.js index 0cf5047a4..21e97eaf4 100644 --- a/packages/neo4j-driver/test/session.test.js +++ b/packages/neo4j-driver/test/session.test.js @@ -20,7 +20,6 @@ import neo4j from '../src' import { READ } from '../src/driver' import SingleConnectionProvider from '../../bolt-connection/lib/connection-provider/connection-provider-single' -import FakeConnection from './internal/fake-connection' import sharedNeo4j from './internal/shared-neo4j' import _ from 'lodash' import testUtils from './internal/test-utils' @@ -40,63 +39,6 @@ const { const { PROTOCOL_ERROR, SESSION_EXPIRED } = error -describe('#unit session', () => { - it('close should return promise', done => { - const connection = new FakeConnection() - const session = newSessionWithConnection(connection) - - session.close().then(() => done()) - }, 70000) - - it('close should return promise even when already closed ', done => { - const connection = new FakeConnection() - const session = newSessionWithConnection(connection) - - session.close().then(() => { - session.close().then(() => { - session.close().then(() => { - done() - }) - }) - }) - }, 70000) - - it('close should be idempotent ', done => { - const connection = new FakeConnection() - const session = newSessionWithConnection(connection) - - session.close().then(() => { - expect(connection.isReleasedOnce()).toBeTruthy() - - session.close().then(() => { - expect(connection.isReleasedOnce()).toBeTruthy() - - session.close().then(() => { - expect(connection.isReleasedOnce()).toBeTruthy() - done() - }) - }) - }) - }, 70000) - - it('should close transaction executor', done => { - const session = newSessionWithConnection(new FakeConnection()) - - let closeCalledTimes = 0 - const transactionExecutor = session._transactionExecutor - const originalClose = transactionExecutor.close - transactionExecutor.close = () => { - closeCalledTimes++ - originalClose.call(transactionExecutor) - } - - session.close().then(() => { - expect(closeCalledTimes).toEqual(1) - done() - }) - }, 70000) -}) - describe('#integration session', () => { let driver let session @@ -1304,12 +1246,3 @@ describe('#integration session', () => { }) } }) - -function newSessionWithConnection (connection) { - const connectionProvider = new SingleConnectionProvider( - Promise.resolve(connection) - ) - const session = new Session({ mode: READ, connectionProvider }) - session.beginTransaction() // force session to acquire new connection - return session -} diff --git a/packages/testkit-backend/package.json b/packages/testkit-backend/package.json index 92761103a..c00e95f9d 100644 --- a/packages/testkit-backend/package.json +++ b/packages/testkit-backend/package.json @@ -13,7 +13,8 @@ "build": "rollup src/index.js --config rollup.config.js", "start": "node --version | grep -q v10. && node -r esm src/index.js || node --experimental-specifier-resolution=node src/index.js", "clean": "rm -fr node_modules public/index.js", - "prepare": "npm run build" + "prepare": "npm run build", + "node": "node" }, "repository": { "type": "git", diff --git a/packages/testkit-backend/src/backend.js b/packages/testkit-backend/src/backend.js index 80fc63265..f84e432ee 100644 --- a/packages/testkit-backend/src/backend.js +++ b/packages/testkit-backend/src/backend.js @@ -21,9 +21,9 @@ export default class Backend { this._channel.on('contextOpen', ({ contextId }) => this._controller.openContext(contextId)) this._channel.on('contextClose', ({ contextId }) => this._controller.closeContext(contextId)) - this._channel.on('request', ({ contextId, request }) => { + this._channel.on('request', async ({ contextId, request }) => { try { - this._controller.handle(contextId, request) + await this._controller.handle(contextId, request) } catch (e) { this._channel.writeBackendError(contextId, e.message) } diff --git a/packages/testkit-backend/src/context.js b/packages/testkit-backend/src/context.js index c13bff92e..df1c80b1e 100644 --- a/packages/testkit-backend/src/context.js +++ b/packages/testkit-backend/src/context.js @@ -5,9 +5,9 @@ export default class Context { this._sessions = {} this._txs = {} this._resolverRequests = {} - this._resultObservers = {} this._errors = {} this._shouldRunTest = shouldRunTest + this._results = {} } addDriver (driver) { @@ -33,10 +33,6 @@ export default class Context { return this._add(this._errors, error) } - addResultObserver (observer) { - return this._add(this._resultObservers, observer) - } - addResolverRequest (resolve, reject) { const id = this._add(this._resolverRequests, { resolve, @@ -45,6 +41,18 @@ export default class Context { return id } + addResult (result) { + return this._add(this._results, result) + } + + removeResult (id) { + delete this._results[id] + } + + getResult (id) { + return this._results[id] + } + getDriver (id) { return this._drivers[id] } @@ -81,20 +89,10 @@ export default class Context { delete this._txs[id] } - removeResultObserver (id) { - delete this._resultObservers[id] - } - removeResolverRequest (id) { delete this._resolverRequests[id] } - getResultObserversBySessionId (sessionId) { - return Object.values(this._resultObservers).filter( - obs => obs.sessionId === sessionId - ) - } - getTxsBySessionId (sessionId) { return Object.values(this._txs).filter(tx => tx.sessionId === sessionId) } diff --git a/packages/testkit-backend/src/controller/interface.js b/packages/testkit-backend/src/controller/interface.js index 3de0cb961..94e4fd9f6 100644 --- a/packages/testkit-backend/src/controller/interface.js +++ b/packages/testkit-backend/src/controller/interface.js @@ -24,7 +24,7 @@ export default class Controller extends EventEmitter { throw new Error('not implemented') } - handle(contextId, request) { + async handle(contextId, request) { throw new Error('not implemented') } } diff --git a/packages/testkit-backend/src/controller/local.js b/packages/testkit-backend/src/controller/local.js index ae2638e51..a4faddaee 100644 --- a/packages/testkit-backend/src/controller/local.js +++ b/packages/testkit-backend/src/controller/local.js @@ -25,7 +25,7 @@ export default class LocalController extends Controller { this._contexts.delete(contextId) } - handle (contextId, { name, data }) { + async handle (contextId, { name, data }) { if (!this._contexts.has(contextId)) { throw new Error(`Context ${contextId} does not exist`) } else if (!(name in this._requestHandlers)) { @@ -34,7 +34,7 @@ export default class LocalController extends Controller { throw new Error(`Unknown request: ${name}`) } - this._requestHandlers[name](this._contexts.get(contextId), data, { + return await this._requestHandlers[name](this._contexts.get(contextId), data, { writeResponse: (name, data) => this._writeResponse(contextId, name, data), writeError: (e) => this._writeError(contextId, e), writeBackendError: (msg) => this._writeBackendError(contextId, msg) diff --git a/packages/testkit-backend/src/controller/remote.js b/packages/testkit-backend/src/controller/remote.js index fc13fedf1..535801b4f 100644 --- a/packages/testkit-backend/src/controller/remote.js +++ b/packages/testkit-backend/src/controller/remote.js @@ -66,7 +66,7 @@ export default class RemoteController extends Controller { this._forwardToConnectedClient('contextClose', contextId, { contextId }) } - handle (contextId, request) { + async handle (contextId, request) { this._forwardToConnectedClient('request', contextId, request) } diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 3e0036291..4686d67c5 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -1,5 +1,4 @@ import neo4j from './neo4j' -import ResultObserver from './result-observer.js' import { cypherToNative, nativeToCypher } from './cypher-native-binders.js' import tls from 'tls' @@ -94,13 +93,13 @@ export function NewDriver (context, data, wire) { export function DriverClose (context, data, wire) { const { driverId } = data const driver = context.getDriver(driverId) - driver + return driver .close() .then(() => { wire.writeResponse('Driver', { id: driverId }) }) - .catch(err => wire.writeError(err)) - context.removeDriver(driverId) + .catch(err => wire.writeError(err)) + .finally(() => context.removeDriver(driverId)) } export function NewSession (context, data, wire) { @@ -131,7 +130,7 @@ export function NewSession (context, data, wire) { export function SessionClose (context, data, wire) { const { sessionId } = data const session = context.getSession(sessionId) - session + return session .close() .then(() => { wire.writeResponse('Session', { id: sessionId }) @@ -148,70 +147,61 @@ export function SessionRun (context, data, wire) { } } - const observers = context.getResultObserversBySessionId(sessionId) - - Promise.all(observers.map(obs => obs.completitionPromise())) - .catch(_ => null) - .then(_ => { - let result - try { - result = session.run(cypher, params, { metadata, timeout }) - } catch (e) { - console.log('got some err: ' + JSON.stringify(e)) - wire.writeError(e) - return - } - const resultObserver = new ResultObserver({ sessionId, result }) - const id = context.addResultObserver(resultObserver) - wire.writeResponse('Result', { id }) - }) + let result + try { + result = session.run(cypher, params, { metadata, timeout }) + } catch (e) { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + return + } + + let id = context.addResult(result) + + wire.writeResponse('Result', { id }) } export function ResultNext (context, data, wire) { const { resultId } = data - const resultObserver = context.getResultObserver(resultId) - const nextPromise = resultObserver.next() - nextPromise - .then(rec => { - if (rec) { - const values = Array.from(rec.values()).map(nativeToCypher) - wire.writeResponse('Record', { - values: values - }) - } else { - wire.writeResponse('NullRecord', null) - } - }) - .catch(e => { - console.log('got some err: ' + JSON.stringify(e)) - wire.writeError(e) - }) + const result = context.getResult(resultId) + if (!("recordIt" in result)) { + result.recordIt = result[Symbol.asyncIterator]() + } + return result.recordIt.next().then(({ value, done }) => { + if (done) { + wire.writeResponse('NullRecord', null) + } else { + const values = Array.from(value.values()).map(nativeToCypher) + wire.writeResponse('Record', { + values: values + }) + } + }).catch(e => { + console.log('got some err: ' + JSON.stringify(e)) + wire.writeError(e) + }); } export function ResultConsume (context, data, wire) { const { resultId } = data - const resultObserver = context.getResultObserver(resultId) - resultObserver - .completitionPromise() - .then(summary => { - wire.writeResponse('Summary', { - ...summary, - serverInfo: { - agent: summary.server.agent, - protocolVersion: summary.server.protocolVersion.toFixed(1) - } - }) + const result = context.getResult(resultId) + return result.summary().then(summary => { + wire.writeResponse('Summary', { + ...summary, + serverInfo: { + agent: summary.server.agent, + protocolVersion: summary.server.protocolVersion.toFixed(1) + } }) - .catch(e => wire.writeError(e)) + }).catch(e => wire.writeError(e)) } export function ResultList (context, data, wire) { const { resultId } = data - const resultObserver = context.getResultObserver(resultId) - const result = resultObserver.result + const result = context.getResult(resultId) - result + return result .then(({ records }) => { const cypherRecords = records.map(rec => { return { values: Array.from(rec.values()).map(nativeToCypher) } @@ -224,7 +214,7 @@ export function ResultList (context, data, wire) { export function SessionReadTransaction (context, data, wire) { const { sessionId, txMeta: metadata } = data const session = context.getSession(sessionId) - session + return session .readTransaction( tx => new Promise((resolve, reject) => { @@ -245,8 +235,8 @@ export function TransactionRun (context, data, wire) { } } const result = tx.tx.run(cypher, params) - const resultObserver = new ResultObserver({ result }) - const id = context.addResultObserver(resultObserver) + const id = context.addResult(result) + wire.writeResponse('Result', { id }) } @@ -283,7 +273,7 @@ export function SessionBeginTransaction (context, data, wire) { export function TransactionCommit (context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) - tx.commit() + return tx.commit() .then(() => wire.writeResponse('Transaction', { id })) .catch(e => { console.log('got some err: ' + JSON.stringify(e)) @@ -294,7 +284,7 @@ export function TransactionCommit (context, data, wire) { export function TransactionRollback (context, data, wire) { const { txId: id } = data const { tx } = context.getTx(id) - tx.rollback() + return tx.rollback() .then(() => wire.writeResponse('Transaction', { id })) .catch(e => wire.writeError(e)) } @@ -309,7 +299,7 @@ export function SessionLastBookmarks (context, data, wire) { export function SessionWriteTransaction (context, data, wire) { const { sessionId, txMeta: metadata } = data const session = context.getSession(sessionId) - session + return session .writeTransaction( tx => new Promise((resolve, reject) => { @@ -355,7 +345,7 @@ export function GetFeatures (_context, _params, wire) { export function VerifyConnectivity (context, { driverId }, wire) { const driver = context.getDriver(driverId) - driver + return driver .verifyConnectivity() .then(() => wire.writeResponse('Driver', { id: driverId })) .catch(error => wire.writeError(error)) @@ -363,7 +353,7 @@ export function VerifyConnectivity (context, { driverId }, wire) { export function CheckMultiDBSupport (context, { driverId }, wire) { const driver = context.getDriver(driverId) - driver + return driver .supportsMultiDb() .then(available => wire.writeResponse('MultiDBSupport', { id: driverId, available }) @@ -417,7 +407,7 @@ export function ForcedRoutingTableUpdate (context, { driverId, database, bookmar if (provider._freshRoutingTable) { // Removing database from the routing table registry provider._routingTableRegistry._remove(database) - provider._freshRoutingTable ({ + return provider._freshRoutingTable ({ accessMode: 'READ', database, bookmark: bookmarks, diff --git a/packages/testkit-backend/src/result-observer.js b/packages/testkit-backend/src/result-observer.js deleted file mode 100644 index 5d5b6d4b2..000000000 --- a/packages/testkit-backend/src/result-observer.js +++ /dev/null @@ -1,127 +0,0 @@ -import neo4j from './neo4j' - -export default class ResultObserver { - constructor ({ sessionId, result }) { - this.sessionId = sessionId - this._result = result - this.keys = null - this._stream = [] - this.summary = null - this._err = null - this._promise = null - this.onKeys = this.onKeys.bind(this) - this.onNext = this.onNext.bind(this) - this.onCompleted = this.onCompleted.bind(this) - this.onError = this.onError.bind(this) - this._completitionPromise = null - this._subscribed = false - } - - onKeys (keys) { - this.keys = keys - } - - onNext (record) { - this._stream.push(record) - this._fulfill() - } - - onCompleted (summary) { - this._summary = summary - this._fulfill() - this._resolve(this._completitionPromise, summary) - this._completitionPromise = null - } - - onError (e) { - this._stream.push(e) - this._fulfill() - this._reject(this._completitionPromise, e) - this._completitionPromise = null - } - - get result () { - return this._result - } - - // Returns a promise, only one outstanding next! - next () { - this._subscribe() - return new Promise((resolve, reject) => { - this._promise = { - resolve, - reject - } - this._fulfill() - }) - } - - completitionPromise () { - this._subscribe() - return new Promise((resolve, reject) => { - if (this._summary) { - resolve(this._summary) - } else if (this._err) { - reject(this._err) - } else { - this._completitionPromise = { - resolve, - reject - } - } - }) - } - - _fulfill () { - if (!this._promise) { - return - } - - // The stream contains something - if (this._stream.length) { - const x = this._stream.shift() - if (!(x instanceof neo4j.types.Record)) { - // For further calls, use this (stream should be empty after this) - this._err = x - this._promise.reject(x) - this._promise = null - return - } - this._promise.resolve(x) - this._promise = null - return - } - - // There has been an error, continue to return that error - if (this._err) { - this._promise.reject(this._err) - this._promise = null - return - } - - // All records have been received - if (this._summary) { - this._promise.resolve(null) - this._promise = null - } - } - - _resolve (promise, data) { - if (promise) { - promise.resolve(data) - } - } - - _reject (promise, err) { - if (promise) { - promise.reject(err) - } - } - - _subscribe () { - if (!this._subscribed) { - this._result.subscribe(this) - this._subscribed = true - } - } -} diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 2e888e054..31a833060 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -1,9 +1,15 @@ -import skip, { ifEquals, ifEndsWith } from './skip' +import skip, { ifEquals, ifEndsWith, ifStartsWith } from './skip' const skippedTests = [ skip( - 'Not support by the JS driver', - ifEquals('neo4j.sessionrun.TestSessionRun.test_partial_iteration') + 'Partial session iteration is not supported by the js driver', + ifEquals('neo4j.sessionrun.TestSessionRun.test_partial_iteration'), + ifEquals('neo4j.test_session_run.TestSessionRun.test_session_reuse'), + ifEquals('neo4j.test_session_run.TestSessionRun.test_iteration_nested'), + ), + skip( + 'Nested calls does not garauntee order in the records pulling', + ifEquals('stub.iteration.test_iteration_tx_run.TestIterationTxRun.test_nested'), ), skip( 'The driver has no support domain_name_resolver', @@ -34,10 +40,6 @@ const skippedTests = [ '.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors' ) ), - skip( - 'It could not guarantee the order of records requests between in the nested transactions', - ifEquals('stub.iteration.TxRun.test_nested') - ), skip( 'Keeps retrying on commit despite connection being dropped', ifEquals('stub.retry.TestRetry.test_disconnect_on_commit')