diff --git a/src/result-rx.js b/src/result-rx.js index 918af16d3..df5989b85 100644 --- a/src/result-rx.js +++ b/src/result-rx.js @@ -96,7 +96,7 @@ export default class RxResult { * @public * @returns {Observable} - An observable stream (with exactly one element) of result summary. */ - summary () { + consume () { return this._result.pipe( flatMap( result => @@ -114,10 +114,6 @@ export default class RxResult { } = {}) { const subscriptions = [] - if (recordsObserver) { - subscriptions.push(this._records.subscribe(recordsObserver)) - } - if (summaryObserver) { subscriptions.push(this._summary.subscribe(summaryObserver)) } @@ -125,6 +121,10 @@ export default class RxResult { if (this._state < States.STREAMING) { this._state = States.STREAMING + if (recordsObserver) { + subscriptions.push(this._records.subscribe(recordsObserver)) + } + subscriptions.push({ unsubscribe: () => { if (result._cancel) { @@ -156,10 +156,10 @@ export default class RxResult { this._state = States.COMPLETED } }) - } else if (this._state === States.STREAMING && recordsObserver) { + } else if (recordsObserver) { recordsObserver.error( newError( - 'Streaming has already started with a previous records or summary subscription.' + 'Streaming has already started/consumed with a previous records or summary subscription.' ) ) } diff --git a/src/session.js b/src/session.js index a2bf76484..22ec8ef51 100644 --- a/src/session.js +++ b/src/session.js @@ -120,7 +120,13 @@ class Session { const connectionHolder = this._connectionHolderWithMode(this._mode) let observerPromise - if (!this._hasTx && connectionHolder.initializeConnection()) { + if (!this._open) { + observerPromise = Promise.resolve( + new FailedObserver({ + error: newError('Cannot run statement in a closed session.') + }) + ) + } else if (!this._hasTx && connectionHolder.initializeConnection()) { observerPromise = connectionHolder .getConnection() .then(connection => customRunner(connection)) @@ -163,6 +169,9 @@ class Session { } _beginTransaction (accessMode, txConfig) { + if (!this._open) { + throw newError('Cannot begin a transaction on a closed session.') + } if (this._hasTx) { throw newError( 'You cannot begin a transaction on a session with an open transaction; ' + @@ -193,7 +202,7 @@ class Session { /** * Return the bookmark received following the last completed {@link Transaction}. * - * @return {string|null} a reference to a previous transaction + * @return {string[]} a reference to a previous transaction */ lastBookmark () { return this._lastBookmark.values() diff --git a/test/internal/node/routing.driver.boltkit.test.js b/test/internal/node/routing.driver.boltkit.test.js index 6c77e9c09..45a49a94b 100644 --- a/test/internal/node/routing.driver.boltkit.test.js +++ b/test/internal/node/routing.driver.boltkit.test.js @@ -1477,7 +1477,7 @@ describe('#stub-routing routing driver with stub server', () => { const driver = boltStub.newDriver('neo4j://127.0.0.1:9010') // run a dummy query to force routing table initialization - const session = driver.session({ defaultAccessMode: READ }) + var session = driver.session({ defaultAccessMode: READ }) const result1 = await session.run('MATCH (n) RETURN n.name') expect(result1.records.length).toEqual(3) await session.close() @@ -1491,6 +1491,7 @@ describe('#stub-routing routing driver with stub server', () => { 9010 ) + session = driver.session({ defaultAccessMode: READ }) const result2 = await session.readTransaction(tx => tx.run('MATCH (n) RETURN n.name AS name') ) diff --git a/test/rx/navigation.test.js b/test/rx/navigation.test.js index 824960cad..a3503971d 100644 --- a/test/rx/navigation.test.js +++ b/test/rx/navigation.test.js @@ -100,11 +100,19 @@ describe('#integration-rx navigation', () => { it('should fail on records when run fails', () => shouldFailOnRecordsWhenRunFails(serverVersion, session)) + it('should fail on subsequent records differently when run fails', () => + shouldFailOnSubsequentRecordsWhenRunFails(serverVersion, session)) + it('should fail on summary when run fails', () => shouldFailOnSummaryWhenRunFails(serverVersion, session)) it('should fail on subsequent summary when run fails', () => - shouldFailOnSubsequentKeysWhenRunFails(serverVersion, session)) + shouldFailOnSubsequentSummaryWhenRunFails(serverVersion, session)) + + it('should fail on result when closed', () => + shouldFailOnResultWhenClosed(serverVersion, session, () => + session.close() + )) }) describe('transaction', () => { @@ -197,11 +205,20 @@ describe('#integration-rx navigation', () => { it('should fail on records when run fails', () => shouldFailOnRecordsWhenRunFails(serverVersion, txc)) + it('should fail on subsequent records differently when run fails', () => + shouldFailOnSubsequentRecordsWhenRunFails(serverVersion, txc)) + it('should fail on summary when run fails', () => shouldFailOnSummaryWhenRunFails(serverVersion, txc)) it('should fail on subsequent summary when run fails', () => - shouldFailOnSubsequentKeysWhenRunFails(serverVersion, txc)) + shouldFailOnSubsequentSummaryWhenRunFails(serverVersion, txc)) + + it('should fail on result when committed', () => + shouldFailOnResultWhenClosed(serverVersion, txc, () => txc.commit())) + + it('should fail on result when rolled back', () => + shouldFailOnResultWhenClosed(serverVersion, txc, () => txc.rollback())) }) /** @@ -310,7 +327,10 @@ describe('#integration-rx navigation', () => { await collectAndAssertKeys(result) await collectAndAssertSummary(result) - await collectAndAssertEmpty(result.records()) + const expectedError = jasmine.objectContaining({ + message: jasmine.stringMatching(/Streaming has already started/) + }) + await collectAndAssertError(result.records(), expectedError) } /** @@ -402,9 +422,13 @@ describe('#integration-rx navigation', () => { ) await collectAndAssertRecords(result) - await collectAndAssertEmpty(result.records()) - await collectAndAssertEmpty(result.records()) - await collectAndAssertEmpty(result.records()) + + const expectedError = jasmine.objectContaining({ + message: jasmine.stringMatching(/Streaming has already started/) + }) + await collectAndAssertError(result.records(), expectedError) + await collectAndAssertError(result.records(), expectedError) + await collectAndAssertError(result.records(), expectedError) } /** @@ -527,6 +551,32 @@ describe('#integration-rx navigation', () => { ) } + /** + * @param {ServerVersion} version + * @param {RxSession|RxTransaction} runnable + */ + async function shouldFailOnSubsequentRecordsWhenRunFails (version, runnable) { + if (version.compareTo(VERSION_4_0_0) < 0) { + return + } + + const result = runnable.run('THIS IS NOT A CYPHER') + + await collectAndAssertError( + result.records(), + jasmine.objectContaining({ + code: 'Neo.ClientError.Statement.SyntaxError', + message: jasmine.stringMatching(/Invalid input/) + }) + ) + + const expectedError = jasmine.objectContaining({ + message: jasmine.stringMatching(/Streaming has already started/) + }) + await collectAndAssertError(result.records(), expectedError) + await collectAndAssertError(result.records(), expectedError) + } + /** * @param {ServerVersion} version * @param {RxSession|RxTransaction} runnable @@ -539,7 +589,7 @@ describe('#integration-rx navigation', () => { const result = runnable.run('THIS IS NOT A CYPHER') await collectAndAssertError( - result.summary(), + result.consume(), jasmine.objectContaining({ code: 'Neo.ClientError.Statement.SyntaxError', message: jasmine.stringMatching(/Invalid input/) @@ -562,9 +612,30 @@ describe('#integration-rx navigation', () => { message: jasmine.stringMatching(/Invalid input/) }) - await collectAndAssertError(result.summary(), expectedError) - await collectAndAssertError(result.summary(), expectedError) - await collectAndAssertError(result.summary(), expectedError) + await collectAndAssertError(result.consume(), expectedError) + await collectAndAssertError(result.consume(), expectedError) + await collectAndAssertError(result.consume(), expectedError) + } + + /** + * @param {ServerVersion} version + * @param {RxSession|RxTransaction} runnable + * @param {function(): Observable} closeFunc + */ + async function shouldFailOnResultWhenClosed (version, runnable, closeFunc) { + if (version.compareTo(VERSION_4_0_0) < 0) { + return + } + + const result = runnable.run('RETURN 1') + await collectAndAssertEmpty(closeFunc()) + + const expectedError = jasmine.objectContaining({ + message: jasmine.stringMatching(/Cannot run statement/) + }) + await collectAndAssertError(result.keys(), expectedError) + await collectAndAssertError(result.records(), expectedError) + await collectAndAssertError(result.consume(), expectedError) } async function collectAndAssertKeys (result) { @@ -602,7 +673,7 @@ describe('#integration-rx navigation', () => { async function collectAndAssertSummary (result, expectedStatementType = 'r') { const summary = await result - .summary() + .consume() .pipe( map(s => s.statementType), materialize(), diff --git a/test/rx/summary.test.js b/test/rx/summary.test.js index 782801996..4a2f91b85 100644 --- a/test/rx/summary.test.js +++ b/test/rx/summary.test.js @@ -249,7 +249,7 @@ describe('#integration-rx summary', () => { const summary = await runnable .run('UNWIND RANGE(1,10) AS n RETURN n') - .summary() + .consume() .toPromise() expect(summary).toBeDefined() @@ -553,7 +553,7 @@ describe('#integration-rx summary', () => { const summary = await runnable .run('CREATE (n) RETURN n') - .summary() + .consume() .toPromise() expect(summary).toBeDefined() expect(summary.hasPlan()).toBeFalsy() @@ -573,7 +573,7 @@ describe('#integration-rx summary', () => { const summary = await runnable .run('EXPLAIN CREATE (n) RETURN n') - .summary() + .consume() .toPromise() expect(summary).toBeDefined() expect(summary.hasPlan()).toBeTruthy() @@ -594,7 +594,7 @@ describe('#integration-rx summary', () => { const summary = await runnable .run('PROFILE CREATE (n) RETURN n') - .summary() + .consume() .toPromise() expect(summary).toBeDefined() expect(summary.hasPlan()).toBeTruthy() @@ -616,7 +616,7 @@ describe('#integration-rx summary', () => { const summary = await runnable .run('CREATE (n) RETURN n') - .summary() + .consume() .toPromise() expect(summary).toBeDefined() expect(summary.notifications).toBeTruthy() @@ -634,7 +634,7 @@ describe('#integration-rx summary', () => { const summary = await runnable .run('EXPLAIN MATCH (n:ThisLabelDoesNotExist) RETURN n') - .summary() + .consume() .toPromise() expect(summary).toBeDefined() expect(summary.notifications).toBeTruthy() @@ -664,7 +664,7 @@ describe('#integration-rx summary', () => { ) { const summary = await runnable .run(statement, parameters) - .summary() + .consume() .toPromise() expect(summary).toBeDefined() expect(summary.statement).toBeDefined() @@ -685,7 +685,7 @@ describe('#integration-rx summary', () => { ) { const summary = await runnable .run(statement) - .summary() + .consume() .toPromise() expect(summary).toBeDefined() expect(summary.statementType).toBe(expectedStatementType) @@ -701,7 +701,7 @@ describe('#integration-rx summary', () => { async function verifyUpdates (runnable, statement, parameters, stats) { const summary = await runnable .run(statement, parameters) - .summary() + .consume() .toPromise() expect(summary).toBeDefined() expect(summary.counters.containsUpdates()).toBeTruthy() @@ -725,7 +725,7 @@ describe('#integration-rx summary', () => { ) { const summary = await runnable .run(statement, parameters) - .summary() + .consume() .toPromise() expect(summary).toBeDefined() diff --git a/test/rx/transaction.test.js b/test/rx/transaction.test.js index 37d542bdc..83e093e7e 100644 --- a/test/rx/transaction.test.js +++ b/test/rx/transaction.test.js @@ -535,7 +535,7 @@ describe('#integration-rx transaction', () => { ) ]) - const summary = await result.summary().toPromise() + const summary = await result.consume().toPromise() expect(summary).toBeTruthy() }) @@ -588,15 +588,15 @@ describe('#integration-rx transaction', () => { await txc .run('CREATE (n:Node {id: 1})') - .summary() + .consume() .toPromise() await txc .run('CREATE (n:Node {id: 2})') - .summary() + .consume() .toPromise() await txc .run('CREATE (n:Node {id: 1})') - .summary() + .consume() .toPromise() await verifyCanCommitOrRollback(txc, commit) diff --git a/test/temporal-types.test.js b/test/temporal-types.test.js index aaa66696f..d0e25d150 100644 --- a/test/temporal-types.test.js +++ b/test/temporal-types.test.js @@ -1460,8 +1460,6 @@ describe('#integration temporal-types', () => { const receivedValue = records[0].get(0) expect(receivedValue).toEqual(value) - - await session.close() } async function testDurationToString (values) {