diff --git a/src/v1/session.js b/src/v1/session.js index 4f7e15af2..06f02fec9 100644 --- a/src/v1/session.js +++ b/src/v1/session.js @@ -96,7 +96,8 @@ class Session { ? new TxConfig(transactionConfig) : TxConfig.empty() - return this._run(query, params, (connection, streamObserver) => + return this._run(query, params, (connection, streamObserver) => { + this._assertSessionIsOpen() connection .protocol() .run( @@ -107,7 +108,7 @@ class Session { this._mode, streamObserver ) - ) + }) } _run (statement, parameters, statementRunner) { @@ -185,12 +186,19 @@ class Session { const tx = new Transaction( connectionHolder, this._transactionClosed.bind(this), - this._updateBookmark.bind(this) + this._updateBookmark.bind(this), + this._assertSessionIsOpen.bind(this) ) tx._begin(this._lastBookmark, txConfig) return tx } + _assertSessionIsOpen () { + if (!this._open) { + throw newError('You cannot run more transactions on a closed session.') + } + } + _transactionClosed () { this._hasTx = false } diff --git a/src/v1/transaction.js b/src/v1/transaction.js index 9f649a965..10c012ed5 100644 --- a/src/v1/transaction.js +++ b/src/v1/transaction.js @@ -34,12 +34,15 @@ class Transaction { * @param {ConnectionHolder} connectionHolder - the connection holder to get connection from. * @param {function()} onClose - Function to be called when transaction is committed or rolled back. * @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced. + * @param onConnection callback invoked when a connection is obtained from connection holder. This need to be + * called to see if the connection is in the process of being released. */ - constructor (connectionHolder, onClose, onBookmark) { + constructor (connectionHolder, onClose, onBookmark, onConnection) { this._connectionHolder = connectionHolder this._state = _states.ACTIVE this._onClose = onClose this._onBookmark = onBookmark + this._onConnection = onConnection } _begin (bookmark, txConfig) { @@ -47,7 +50,8 @@ class Transaction { this._connectionHolder .getConnection(streamObserver) - .then(conn => + .then(conn => { + this._onConnection() conn .protocol() .beginTransaction( @@ -56,7 +60,7 @@ class Transaction { this._connectionHolder.mode(), streamObserver ) - ) + }) .catch(error => streamObserver.onError(error)) } @@ -78,7 +82,8 @@ class Transaction { this._connectionHolder, new _TransactionStreamObserver(this), query, - params + params, + this._onConnection ) } @@ -90,9 +95,10 @@ class Transaction { * @returns {Result} New Result */ commit () { - let committed = this._state.commit( + const committed = this._state.commit( this._connectionHolder, - new _TransactionStreamObserver(this) + new _TransactionStreamObserver(this), + this._onConnection ) this._state = committed.state // clean up @@ -108,9 +114,10 @@ class Transaction { * @returns {Result} New Result */ rollback () { - let committed = this._state.rollback( + const committed = this._state.rollback( this._connectionHolder, - new _TransactionStreamObserver(this) + new _TransactionStreamObserver(this), + this._onConnection ) this._state = committed.state // clean up @@ -161,29 +168,40 @@ class _TransactionStreamObserver extends StreamObserver { } /** internal state machine of the transaction */ -let _states = { +const _states = { // The transaction is running with no explicit success or failure marked ACTIVE: { - commit: (connectionHolder, observer) => { + commit: (connectionHolder, observer, onConnection) => { return { - result: finishTransaction(true, connectionHolder, observer), + result: finishTransaction( + true, + connectionHolder, + observer, + onConnection + ), state: _states.SUCCEEDED } }, - rollback: (connectionHolder, observer) => { + rollback: (connectionHolder, observer, onConnection) => { return { - result: finishTransaction(false, connectionHolder, observer), + result: finishTransaction( + false, + connectionHolder, + observer, + onConnection + ), state: _states.ROLLED_BACK } }, - run: (connectionHolder, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters, onConnection) => { // RUN in explicit transaction can't contain bookmarks and transaction configuration const bookmark = Bookmark.empty() const txConfig = TxConfig.empty() connectionHolder .getConnection(observer) - .then(conn => + .then(conn => { + onConnection() conn .protocol() .run( @@ -194,7 +212,7 @@ let _states = { connectionHolder.mode(), observer ) - ) + }) .catch(error => observer.onError(error)) return _newRunResult(observer, statement, parameters, () => @@ -206,7 +224,7 @@ let _states = { // An error has occurred, transaction can no longer be used and no more messages will // be sent for this transaction. FAILED: { - commit: (connectionHolder, observer) => { + commit: (connectionHolder, observer, onConnection) => { observer.onError({ error: 'Cannot commit statements in this transaction, because previous statements in the ' + @@ -218,14 +236,14 @@ let _states = { state: _states.FAILED } }, - rollback: (connectionHolder, observer) => { + rollback: (connectionHolder, observer, onConnection) => { observer.markCompleted() return { result: _newDummyResult(observer, 'ROLLBACK', {}), state: _states.FAILED } }, - run: (connectionHolder, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters, onConnection) => { observer.onError({ error: 'Cannot run statement, because previous statements in the ' + @@ -237,7 +255,7 @@ let _states = { // This transaction has successfully committed SUCCEEDED: { - commit: (connectionHolder, observer) => { + commit: (connectionHolder, observer, onConnection) => { observer.onError({ error: 'Cannot commit statements in this transaction, because commit has already been successfully called on the transaction and transaction has been closed. Please start a new' + @@ -248,7 +266,7 @@ let _states = { state: _states.SUCCEEDED } }, - rollback: (connectionHolder, observer) => { + rollback: (connectionHolder, observer, onConnection) => { observer.onError({ error: 'Cannot rollback transaction, because transaction has already been successfully closed.' @@ -258,7 +276,7 @@ let _states = { state: _states.SUCCEEDED } }, - run: (connectionHolder, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters, onConnection) => { observer.onError({ error: 'Cannot run statement, because transaction has already been successfully closed.' @@ -269,7 +287,7 @@ let _states = { // This transaction has been rolled back ROLLED_BACK: { - commit: (connectionHolder, observer) => { + commit: (connectionHolder, observer, onConnection) => { observer.onError({ error: 'Cannot commit this transaction, because it has already been rolled back.' @@ -279,7 +297,7 @@ let _states = { state: _states.ROLLED_BACK } }, - rollback: (connectionHolder, observer) => { + rollback: (connectionHolder, observer, onConnection) => { observer.onError({ error: 'Cannot rollback transaction, because transaction has already been rolled back.' @@ -289,7 +307,7 @@ let _states = { state: _states.ROLLED_BACK } }, - run: (connectionHolder, observer, statement, parameters) => { + run: (connectionHolder, observer, statement, parameters, onConnection) => { observer.onError({ error: 'Cannot run statement, because transaction has already been rolled back.' @@ -299,10 +317,11 @@ let _states = { } } -function finishTransaction (commit, connectionHolder, observer) { +function finishTransaction (commit, connectionHolder, observer, onConnection) { connectionHolder .getConnection(observer) .then(connection => { + onConnection() if (commit) { return connection.protocol().commitTransaction(observer) } else { diff --git a/test/internal/node/routing.driver.boltkit.test.js b/test/internal/node/routing.driver.boltkit.test.js index 11b25edda..804f7cf9b 100644 --- a/test/internal/node/routing.driver.boltkit.test.js +++ b/test/internal/node/routing.driver.boltkit.test.js @@ -1821,6 +1821,7 @@ describe('routing driver with stub server', () => { ) boltStub.run(() => { + const session = driver.session(READ) session .readTransaction(tx => tx.run('MATCH (n) RETURN n.name AS name') @@ -2328,7 +2329,7 @@ describe('routing driver with stub server', () => { const session = driver.session(WRITE, bookmarks) const tx = session.beginTransaction() - tx.run(`CREATE (n {name:'Bob'})`).then(() => { + tx.run('CREATE (n {name:\'Bob\'})').then(() => { tx.commit().then(() => { expect(session.lastBookmark()).toEqual('neo4j:bookmark:v1:tx95') @@ -2348,11 +2349,11 @@ describe('routing driver with stub server', () => { }) it('should forget writer on database unavailable error', done => { - testAddressPurgeOnDatabaseError(`CREATE (n {name:'Bob'})`, WRITE, done) + testAddressPurgeOnDatabaseError('CREATE (n {name:\'Bob\'})', WRITE, done) }) it('should forget reader on database unavailable error', done => { - testAddressPurgeOnDatabaseError(`RETURN 1`, READ, done) + testAddressPurgeOnDatabaseError('RETURN 1', READ, done) }) it('should use resolver function that returns array during first discovery', done => { diff --git a/test/v1/temporal-types.test.js b/test/v1/temporal-types.test.js index 03f082aaa..e3d780aaf 100644 --- a/test/v1/temporal-types.test.js +++ b/test/v1/temporal-types.test.js @@ -1480,7 +1480,6 @@ describe('temporal-types', () => { const value = records[0].get(0) expect(value).toEqual(expectedValue) - session.close() done() }) .catch(error => { @@ -1498,7 +1497,6 @@ describe('temporal-types', () => { const receivedValue = records[0].get(0) expect(receivedValue).toEqual(value) - session.close() done() }) .catch(error => { diff --git a/test/v1/transaction.test.js b/test/v1/transaction.test.js index ef801dad9..10f988624 100644 --- a/test/v1/transaction.test.js +++ b/test/v1/transaction.test.js @@ -572,6 +572,27 @@ describe('transaction', () => { }) }) + it('should reset transaction', done => { + const RetryTimeoutLimit = 10000 + const TransactionTimeout = 30000 + + const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken, { + maxTransactionRetryTime: RetryTimeoutLimit, + connectionTimeout: TransactionTimeout + }) + const session = driver.session() + const runPromise = session + .readTransaction(transaction => transaction.run('RETURN 1')) + .catch(error => { + expect(error.message).toBe( + 'You cannot run more transactions on a closed session.' + ) + driver.close() + done() + }) + session.close() // This will interrupt runPromise to reset the transaction + }) + function expectSyntaxError (error) { expect(error.code).toBe('Neo.ClientError.Statement.SyntaxError') }