diff --git a/packages/core/src/result.ts b/packages/core/src/result.ts index 062f2f079..729168be9 100644 --- a/packages/core/src/result.ts +++ b/packages/core/src/result.ts @@ -21,6 +21,7 @@ import ResultSummary from './result-summary' import Record from './record' import { Query, PeekableAsyncIterator } from './types' import { observer, util, connectionHolder } from './internal' +import { newError } from './error' const { EMPTY_CONNECTION_HOLDER } = connectionHolder @@ -238,6 +239,13 @@ class Result implements Promise { * @returns {PeekableAsyncIterator} The async iterator for the Results */ [Symbol.asyncIterator](): PeekableAsyncIterator { + if (!this.isOpen()) { + const error = newError('Result is already consumed') + return { + next: () => Promise.reject(error), + peek: () => Promise.reject(error), + } + } const state: { paused: boolean, firstRun: boolean, @@ -362,6 +370,14 @@ class Result implements Promise { .catch(() => {}) } + /** + * Check if this result is active, i.e., neither a summary nor an error has been received by the result. + * @return {boolean} `true` when neither a summary or nor an error has been received by the result. + */ + isOpen (): boolean { + return this._summary === null && this._error === null + } + /** * Stream records to observer as they come in, this is a more efficient method * of handling the results, and allows you to handle arbitrarily large results. diff --git a/packages/core/src/transaction.ts b/packages/core/src/transaction.ts index 8703e034a..d992d0a33 100644 --- a/packages/core/src/transaction.ts +++ b/packages/core/src/transaction.ts @@ -600,7 +600,7 @@ function finishTransaction( .then(connection => { onConnection() pendingResults.forEach(r => r._cancel()) - return Promise.all(pendingResults).then(results => { + return Promise.all(pendingResults.map(result => result.summary())).then(results => { if (connection) { if (commit) { return connection.protocol().commitTransaction({ diff --git a/packages/core/test/result.test.ts b/packages/core/test/result.test.ts index ed8a8e70a..c9b87e1e3 100644 --- a/packages/core/test/result.test.ts +++ b/packages/core/test/result.test.ts @@ -860,6 +860,26 @@ describe('Result', () => { ]) }) + it.each([ + ['success', async (stream: any) => stream.onCompleted({})], + ['error', async (stream: any) => stream.onError(new Error('error'))], + ])('should thrown on iterating over an consumed result [%s]', async(_, completeStream) => { + completeStream(streamObserverMock) + + await result.summary().catch(() => {}) + + try { + for await (const _ of result) { + expect('not to iterate over consumed result').toBe(true) + } + expect('not to finish iteration over consumed result').toBe(true) + } catch (e) { + expect(e).toEqual(newError('Result is already consumed')) + } + + expect('not to finish iteration over consumed result') + }) + describe('.return()', () => { it('should finished the operator when it get called', async () => { const keys = ['a', 'b'] @@ -1205,6 +1225,30 @@ describe('Result', () => { }) }) }) + + describe('.isOpen()', () => { + it('should return true when the stream is open', async () => { + await result._subscribe({}).catch(() => {}) + + expect(result.isOpen()).toBe(true) + }) + + it('should return false when the stream is closed', async () => { + streamObserverMock.onCompleted({}) + + await result._subscribe({}).catch(() => {}) + + expect(result.isOpen()).toBe(false) + }) + + it('should return false when the stream failed', async () => { + streamObserverMock.onError(new Error('test')) + + await result._subscribe({}).catch(() => {}) + + expect(result.isOpen()).toBe(false) + }) + }) }) describe.each([ @@ -1319,6 +1363,19 @@ describe('Result', () => { }) }) }) + + describe('isOpen()', () => { + it('should be true', () => { + expect(result.isOpen()).toBe(true) + }) + + it('should be false after any interaction with the stream', async () => { + const it = result[Symbol.asyncIterator]() + await it.next() + + expect(result.isOpen()).toBe(false) + }) + }) }) describe.each([ @@ -1381,6 +1438,30 @@ describe('Result', () => { }) }) + describe('.isOpen()', () => { + it('should be true', async () => { + expect(result.isOpen()).toBe(true) + + // consume the stream to avoid unhaddled errors + try { + await result.summary() + } catch (error) { + } + }) + + it('should be false after any interactio with the stream', async () => { + const it = result[Symbol.asyncIterator]() + + try { + await it.next() + } catch (error) { + // this's fine + } + + expect(result.isOpen()).toBe(false) + }) + }) + function shouldReturnRejectedPromiseWithTheExpectedError( supplier: () => Promise ) { diff --git a/packages/neo4j-driver/test/rx/navigation.test.js b/packages/neo4j-driver/test/rx/navigation.test.js index 9c74d4d1a..6c37cc7f7 100644 --- a/packages/neo4j-driver/test/rx/navigation.test.js +++ b/packages/neo4j-driver/test/rx/navigation.test.js @@ -170,14 +170,19 @@ describe('#integration-rx navigation', () => { 60000 ) - it( - 'should fail on result when closed', - () => - shouldFailOnResultWhenClosed(protocolVersion, session, () => - session.close() - ), - 60000 - ) + getObservableSelectors().forEach(([observableName, observableSelector]) => { + it( + `${observableName} should fail on result when closed`, + () => + shouldFailOnResultWhenClosed( + protocolVersion, + session, + observableSelector, + () => session.close() + ), + 60000 + ) + }) }) describe('transaction', () => { @@ -340,21 +345,33 @@ describe('#integration-rx navigation', () => { 60000 ) - it( - 'should fail on result when committed', - () => - shouldFailOnResultWhenClosed(protocolVersion, txc, () => txc.commit()), - 60000 - ) + getObservableSelectors().forEach(([observableName, observableSelector]) => { + it( + `${observableName} should fail on result when committed`, + () => + shouldFailOnResultWhenClosed( + protocolVersion, + txc, + observableSelector, + () => txc.commit() + ), + 60000 + ) + }) - it( - 'should fail on result when rolled back', - () => - shouldFailOnResultWhenClosed(protocolVersion, txc, () => - txc.rollback() - ), - 60000 - ) + getObservableSelectors().forEach(([observableName, observableSelector]) => { + it( + `${observableName}should fail on result when rolled back`, + () => + shouldFailOnResultWhenClosed( + protocolVersion, + txc, + observableSelector, + () => txc.rollback() + ), + 60000 + ) + }) }) /** @@ -762,14 +779,24 @@ describe('#integration-rx navigation', () => { await collectAndAssertError(result.consume(), expectedError) } + function getObservableSelectors () { + return [ + ['consume', r => r.consume()], + ['keys', r => r.keys()], + ['records', r => r.records()] + ] + } + /** * @param {number} protocolVersion * @param {RxSession|RxTransaction} runnable + * @param {function(RxSession|RxTransaction):Observable} selectObservable * @param {function(): Observable} closeFunc */ async function shouldFailOnResultWhenClosed ( protocolVersion, runnable, + selectObservable, closeFunc ) { if (protocolVersion < 4.0) { @@ -782,9 +809,7 @@ describe('#integration-rx navigation', () => { const expectedError = jasmine.objectContaining({ message: jasmine.stringMatching(/Cannot run query/) }) - await collectAndAssertError(result.keys(), expectedError) - await collectAndAssertError(result.records(), expectedError) - await collectAndAssertError(result.consume(), expectedError) + await collectAndAssertError(selectObservable(result), expectedError) } async function collectAndAssertKeys (result) { diff --git a/packages/testkit-backend/src/controller/local.js b/packages/testkit-backend/src/controller/local.js index 84bf4d402..5f4471aae 100644 --- a/packages/testkit-backend/src/controller/local.js +++ b/packages/testkit-backend/src/controller/local.js @@ -63,7 +63,7 @@ export default class LocalController extends Controller { const id = this._contexts.get(contextId).addError(e) this._writeResponse(contextId, newResponse('DriverError', { id, - msg: e.message + ' (' + e.code + ')', + msg: e.message, code: e.code })) } diff --git a/packages/testkit-backend/src/skipped-tests/common.js b/packages/testkit-backend/src/skipped-tests/common.js index 2887fa17d..50fed8599 100644 --- a/packages/testkit-backend/src/skipped-tests/common.js +++ b/packages/testkit-backend/src/skipped-tests/common.js @@ -138,10 +138,6 @@ const skippedTests = [ 'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list' ) ), - skip( - 'Results are always valid but don\'t return records when out of scope', - ifStartsWith('stub.iteration.test_result_scope.TestResultScope.') - ), skip( 'Driver (still) allows explicit managing of managed transaction', ifEquals('stub.tx_lifetime.test_tx_lifetime.TestTxLifetime.test_managed_tx_raises_tx_managed_exec')