From f93415087bef270f0d4846852568d798ea3f2f4a Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 10 Mar 2022 16:42:59 +0100 Subject: [PATCH 1/5] Prevent resolve or iterate over already consumed Result This `result.then` and `for await (const r of result)` in already consumed results could block for ever since no new event will came from wire. Blocking this kind of access improves not correct access to the Result object. This change also includes the addition of `Result.isOpen()` --- packages/core/src/result.ts | 15 +++ packages/core/src/transaction.ts | 2 +- packages/core/test/result.test.ts | 97 +++++++++++++++++++ .../testkit-backend/src/controller/local.js | 2 +- .../src/skipped-tests/common.js | 8 +- 5 files changed, 118 insertions(+), 6 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 062f2f079..48d0ac37b 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -21,6 +21,7 @@ import ResultSummary from './result-summary' import Record from './record' import { Query, PeekableAsyncIterator } from './types' import { observer, util, connectionHolder } from './internal' +import { newError } from './error' const { EMPTY_CONNECTION_HOLDER } = connectionHolder @@ -208,6 +209,9 @@ class Result implements Promise { * @return {Promise} new Promise. */ private _getOrCreatePromise(): Promise { + if (!this.isOpen()) { + return Promise.reject(newError('Result is already consumed')) + } if (!this._p) { this._p = new Promise((resolve, reject) => { const records: Record[] = [] @@ -238,6 +242,13 @@ class Result implements Promise { * @returns {PeekableAsyncIterator} The async iterator for the Results */ [Symbol.asyncIterator](): PeekableAsyncIterator { + if (!this.isOpen()) { + const error = newError('Result is already consumed') + return { + next: () => Promise.reject(error), + peek: () => Promise.reject(error), + } + } const state: { paused: boolean, firstRun: boolean, @@ -362,6 +373,10 @@ class Result implements Promise { .catch(() => {}) } + isOpen (): boolean { + return this._summary === null && this._error === null + } + /** * Stream records to observer as they come in, this is a more efficient method * of handling the results, and allows you to handle arbitrarily large results. diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index 8703e034a..d992d0a33 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -600,7 +600,7 @@ function finishTransaction( .then(connection => { onConnection() pendingResults.forEach(r => r._cancel()) - return Promise.all(pendingResults).then(results => { + return Promise.all(pendingResults.map(result => result.summary())).then(results => { if (connection) { if (commit) { return connection.protocol().commitTransaction({ diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index ed8a8e70a..ebabd792e 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -451,6 +451,22 @@ describe('Result', () => { result.catch(() => { }).finally(done) }) + it.each([ + ['success', async (stream: any) => stream.onCompleted({})], + //['error', async (stream: any) => stream.onError(new Error('error'))], + ])('should thrown over an consumed result [%s]', async(_, completeStream) => { + completeStream(streamObserverMock) + + await result.summary().catch(() => {}) + + try { + await result + expect('not finish iteration over consumed result').toBe(true) + } catch (e) { + expect(e).toEqual(newError('Result is already consumed')) + } + }) + describe.each([ ['query', {}, { query: 'query', parameters: {} }], ['query', { a: 1 }, { query: 'query', parameters: { a: 1 } }], @@ -860,6 +876,26 @@ describe('Result', () => { ]) }) + it.each([ + ['success', async (stream: any) => stream.onCompleted({})], + ['error', async (stream: any) => stream.onError(new Error('error'))], + ])('should thrown on iterate over an consumed result [%s]', async(_, completeStream) => { + completeStream(streamObserverMock) + + await result.summary().catch(() => {}) + + try { + for await (const _ of result) { + expect('not iterate over consumed result').toBe(true) + } + expect('not finish iteration over consumed result').toBe(true) + } catch (e) { + expect(e).toEqual(newError('Result is already consumed')) + } + + expect('not finish iteration over consumed result') + }) + describe('.return()', () => { it('should finished the operator when it get called', async () => { const keys = ['a', 'b'] @@ -1205,6 +1241,30 @@ describe('Result', () => { }) }) }) + + describe('.isOpen()', () => { + it('should return true when the stream is open', async () => { + await result._subscribe({}).catch(() => {}) + + expect(result.isOpen()).toBe(true) + }) + + it('should return false when the stream is closed', async () => { + streamObserverMock.onCompleted({}) + + await result._subscribe({}).catch(() => {}) + + expect(result.isOpen()).toBe(false) + }) + + it('should return false when the stream is failed', async () => { + streamObserverMock.onError(new Error('test')) + + await result._subscribe({}).catch(() => {}) + + expect(result.isOpen()).toBe(false) + }) + }) }) describe.each([ @@ -1319,6 +1379,19 @@ describe('Result', () => { }) }) }) + + describe('isOpen()', () => { + it('should be true', () => { + expect(result.isOpen()).toBe(true) + }) + + it('should be false after any interactio with the stream', async () => { + const it = result[Symbol.asyncIterator]() + await it.next() + + expect(result.isOpen()).toBe(false) + }) + }) }) describe.each([ @@ -1381,6 +1454,30 @@ describe('Result', () => { }) }) + describe('.isOpen()', () => { + it('should be true', async () => { + expect(result.isOpen()).toBe(true) + + // consume the stream to avoid unhaddled errors + try { + await result.summary() + } catch (error) { + } + }) + + it('should be false after any interactio with the stream', async () => { + const it = result[Symbol.asyncIterator]() + + try { + await it.next() + } catch (error) { + // this's fine + } + + expect(result.isOpen()).toBe(false) + }) + }) + function shouldReturnRejectedPromiseWithTheExpectedError( supplier: () => Promise ) { diff --git a/packages/testkit-backend/src/controller/local.js b/packages/testkit-backend/src/controller/local.js index 84bf4d402..5f4471aae 100644 --- a/packages/testkit-backend/src/controller/local.js +++ b/packages/testkit-backend/src/controller/local.js @@ -63,7 +63,7 @@ export default class LocalController extends Controller { const id = this._contexts.get(contextId).addError(e) this._writeResponse(contextId, newResponse('DriverError', { id, - msg: e.message + ' (' + e.code + ')', + msg: e.message, code: e.code })) } diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 2887fa17d..a63d02074 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -138,10 +138,10 @@ const skippedTests = [ 'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list' ) ), - skip( - 'Results are always valid but don\'t return records when out of scope', - ifStartsWith('stub.iteration.test_result_scope.TestResultScope.') - ), + // skip( + // 'Results are always valid but don\'t return records when out of scope', + // ifStartsWith('stub.iteration.test_result_scope.TestResultScope.') + // ), skip( 'Driver (still) allows explicit managing of managed transaction', ifEquals('stub.tx_lifetime.test_tx_lifetime.TestTxLifetime.test_managed_tx_raises_tx_managed_exec') From caa1810be19293bc1a4a79a65bbea8be8d3b42ad Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Thu, 10 Mar 2022 16:50:42 +0100 Subject: [PATCH 2/5] Docs --- packages/core/src/result.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 48d0ac37b..fd7177622 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -373,6 +373,10 @@ class Result implements Promise { .catch(() => {}) } + /** + * Check if this result is active, which means summary or error are not received by the result. + * @return {boolean} `true` when summary or error are not received by the result. + */ isOpen (): boolean { return this._summary === null && this._error === null } From 3e30b3ebac9650ecfd12fd8557961658ed7ce9dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Mon, 14 Mar 2022 15:24:01 +0100 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Robsdedude --- packages/core/src/result.ts | 4 ++-- packages/core/test/result.test.ts | 16 ++++++++-------- .../testkit-backend/src/skipped-tests/common.js | 4 ---- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index fd7177622..6b3ea2cdd 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -374,8 +374,8 @@ class Result implements Promise { } /** - * Check if this result is active, which means summary or error are not received by the result. - * @return {boolean} `true` when summary or error are not received by the result. + * Check if this result is active, i.e., neither a summary nor an error has been received by the result. + * @return {boolean} `true` when neither a summary or nor an error has been received by the result. */ isOpen (): boolean { return this._summary === null && this._error === null diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index ebabd792e..a73504b67 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -454,14 +454,14 @@ describe('Result', () => { it.each([ ['success', async (stream: any) => stream.onCompleted({})], //['error', async (stream: any) => stream.onError(new Error('error'))], - ])('should thrown over an consumed result [%s]', async(_, completeStream) => { + ])('should throw when iterating over consumed result [%s]', async(_, completeStream) => { completeStream(streamObserverMock) await result.summary().catch(() => {}) try { await result - expect('not finish iteration over consumed result').toBe(true) + expect('not to finish iteration over consumed result').toBe(true) } catch (e) { expect(e).toEqual(newError('Result is already consumed')) } @@ -879,21 +879,21 @@ describe('Result', () => { it.each([ ['success', async (stream: any) => stream.onCompleted({})], ['error', async (stream: any) => stream.onError(new Error('error'))], - ])('should thrown on iterate over an consumed result [%s]', async(_, completeStream) => { + ])('should thrown on iterating over an consumed result [%s]', async(_, completeStream) => { completeStream(streamObserverMock) await result.summary().catch(() => {}) try { for await (const _ of result) { - expect('not iterate over consumed result').toBe(true) + expect('not to iterate over consumed result').toBe(true) } - expect('not finish iteration over consumed result').toBe(true) + expect('not to finish iteration over consumed result').toBe(true) } catch (e) { expect(e).toEqual(newError('Result is already consumed')) } - expect('not finish iteration over consumed result') + expect('not to finish iteration over consumed result') }) describe('.return()', () => { @@ -1257,7 +1257,7 @@ describe('Result', () => { expect(result.isOpen()).toBe(false) }) - it('should return false when the stream is failed', async () => { + it('should return false when the stream failed', async () => { streamObserverMock.onError(new Error('test')) await result._subscribe({}).catch(() => {}) @@ -1385,7 +1385,7 @@ describe('Result', () => { expect(result.isOpen()).toBe(true) }) - it('should be false after any interactio with the stream', async () => { + it('should be false after any interaction with the stream', async () => { const it = result[Symbol.asyncIterator]() await it.next() diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index a63d02074..50fed8599 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -138,10 +138,6 @@ const skippedTests = [ 'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list' ) ), - // skip( - // 'Results are always valid but don\'t return records when out of scope', - // ifStartsWith('stub.iteration.test_result_scope.TestResultScope.') - // ), skip( 'Driver (still) allows explicit managing of managed transaction', ifEquals('stub.tx_lifetime.test_tx_lifetime.TestTxLifetime.test_managed_tx_raises_tx_managed_exec') From b98a26a32c4d03496e2bc7617ea05f3ed6c9c7b7 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 14 Mar 2022 15:33:46 +0100 Subject: [PATCH 4/5] Remove commented test --- packages/core/test/result.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index a73504b67..1e1f01603 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -453,7 +453,7 @@ describe('Result', () => { it.each([ ['success', async (stream: any) => stream.onCompleted({})], - //['error', async (stream: any) => stream.onError(new Error('error'))], + ['error', async (stream: any) => stream.onError(new Error('error'))], ])('should throw when iterating over consumed result [%s]', async(_, completeStream) => { completeStream(streamObserverMock) From 22a61ec2a30607a837a1e4705fca674918971b12 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 15 Mar 2022 13:41:29 +0100 Subject: [PATCH 5/5] Do not change the behaviour of 'then' --- packages/core/src/result.ts | 3 - packages/core/test/result.test.ts | 16 ---- .../neo4j-driver/test/rx/navigation.test.js | 75 ++++++++++++------- 3 files changed, 50 insertions(+), 44 deletions(-) diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 6b3ea2cdd..729168be9 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -209,9 +209,6 @@ class Result implements Promise { * @return {Promise} new Promise. */ private _getOrCreatePromise(): Promise { - if (!this.isOpen()) { - return Promise.reject(newError('Result is already consumed')) - } if (!this._p) { this._p = new Promise((resolve, reject) => { const records: Record[] = [] diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index 1e1f01603..c9b87e1e3 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -451,22 +451,6 @@ describe('Result', () => { result.catch(() => { }).finally(done) }) - it.each([ - ['success', async (stream: any) => stream.onCompleted({})], - ['error', async (stream: any) => stream.onError(new Error('error'))], - ])('should throw when iterating over consumed result [%s]', async(_, completeStream) => { - completeStream(streamObserverMock) - - await result.summary().catch(() => {}) - - try { - await result - expect('not to finish iteration over consumed result').toBe(true) - } catch (e) { - expect(e).toEqual(newError('Result is already consumed')) - } - }) - describe.each([ ['query', {}, { query: 'query', parameters: {} }], ['query', { a: 1 }, { query: 'query', parameters: { a: 1 } }], diff --git a/packages/neo4j-driver/test/rx/navigation.test.js b/packages/neo4j-driver/test/rx/navigation.test.js index 9c74d4d1a..6c37cc7f7 100644 --- a/packages/neo4j-driver/test/rx/navigation.test.js +++ b/packages/neo4j-driver/test/rx/navigation.test.js @@ -170,14 +170,19 @@ describe('#integration-rx navigation', () => { 60000 ) - it( - 'should fail on result when closed', - () => - shouldFailOnResultWhenClosed(protocolVersion, session, () => - session.close() - ), - 60000 - ) + getObservableSelectors().forEach(([observableName, observableSelector]) => { + it( + `${observableName} should fail on result when closed`, + () => + shouldFailOnResultWhenClosed( + protocolVersion, + session, + observableSelector, + () => session.close() + ), + 60000 + ) + }) }) describe('transaction', () => { @@ -340,21 +345,33 @@ describe('#integration-rx navigation', () => { 60000 ) - it( - 'should fail on result when committed', - () => - shouldFailOnResultWhenClosed(protocolVersion, txc, () => txc.commit()), - 60000 - ) + getObservableSelectors().forEach(([observableName, observableSelector]) => { + it( + `${observableName} should fail on result when committed`, + () => + shouldFailOnResultWhenClosed( + protocolVersion, + txc, + observableSelector, + () => txc.commit() + ), + 60000 + ) + }) - it( - 'should fail on result when rolled back', - () => - shouldFailOnResultWhenClosed(protocolVersion, txc, () => - txc.rollback() - ), - 60000 - ) + getObservableSelectors().forEach(([observableName, observableSelector]) => { + it( + `${observableName}should fail on result when rolled back`, + () => + shouldFailOnResultWhenClosed( + protocolVersion, + txc, + observableSelector, + () => txc.rollback() + ), + 60000 + ) + }) }) /** @@ -762,14 +779,24 @@ describe('#integration-rx navigation', () => { await collectAndAssertError(result.consume(), expectedError) } + function getObservableSelectors () { + return [ + ['consume', r => r.consume()], + ['keys', r => r.keys()], + ['records', r => r.records()] + ] + } + /** * @param {number} protocolVersion * @param {RxSession|RxTransaction} runnable + * @param {function(RxSession|RxTransaction):Observable} selectObservable * @param {function(): Observable} closeFunc */ async function shouldFailOnResultWhenClosed ( protocolVersion, runnable, + selectObservable, closeFunc ) { if (protocolVersion < 4.0) { @@ -782,9 +809,7 @@ describe('#integration-rx navigation', () => { const expectedError = jasmine.objectContaining({ message: jasmine.stringMatching(/Cannot run query/) }) - await collectAndAssertError(result.keys(), expectedError) - await collectAndAssertError(result.records(), expectedError) - await collectAndAssertError(result.consume(), expectedError) + await collectAndAssertError(selectObservable(result), expectedError) } async function collectAndAssertKeys (result) {