Skip to content

Stop accepting more messages after a connection is reset #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/v1/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ class Session {
? new TxConfig(transactionConfig)
: TxConfig.empty()

return this._run(query, params, (connection, streamObserver) =>
return this._run(query, params, (connection, streamObserver) => {
this._assertSessionIsOpen()
connection
.protocol()
.run(
Expand All @@ -107,7 +108,7 @@ class Session {
this._mode,
streamObserver
)
)
})
}

_run (statement, parameters, statementRunner) {
Expand Down Expand Up @@ -185,12 +186,19 @@ class Session {
const tx = new Transaction(
connectionHolder,
this._transactionClosed.bind(this),
this._updateBookmark.bind(this)
this._updateBookmark.bind(this),
this._assertSessionIsOpen.bind(this)
)
tx._begin(this._lastBookmark, txConfig)
return tx
}

_assertSessionIsOpen () {
if (!this._open) {
throw newError('You cannot run more transactions on a closed session.')
}
}

_transactionClosed () {
this._hasTx = false
}
Expand Down
71 changes: 45 additions & 26 deletions src/v1/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,24 @@ class Transaction {
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
* @param {function(bookmark: Bookmark)} onBookmark callback invoked when new bookmark is produced.
* @param onConnection callback invoked when a connection is obtained from connection holder. This need to be
* called to see if the connection is in the process of being released.
*/
constructor (connectionHolder, onClose, onBookmark) {
constructor (connectionHolder, onClose, onBookmark, onConnection) {
this._connectionHolder = connectionHolder
this._state = _states.ACTIVE
this._onClose = onClose
this._onBookmark = onBookmark
this._onConnection = onConnection
}

_begin (bookmark, txConfig) {
const streamObserver = new _TransactionStreamObserver(this)

this._connectionHolder
.getConnection(streamObserver)
.then(conn =>
.then(conn => {
this._onConnection()
conn
.protocol()
.beginTransaction(
Expand All @@ -56,7 +60,7 @@ class Transaction {
this._connectionHolder.mode(),
streamObserver
)
)
})
.catch(error => streamObserver.onError(error))
}

Expand All @@ -78,7 +82,8 @@ class Transaction {
this._connectionHolder,
new _TransactionStreamObserver(this),
query,
params
params,
this._onConnection
)
}

Expand All @@ -90,9 +95,10 @@ class Transaction {
* @returns {Result} New Result
*/
commit () {
let committed = this._state.commit(
const committed = this._state.commit(
this._connectionHolder,
new _TransactionStreamObserver(this)
new _TransactionStreamObserver(this),
this._onConnection
)
this._state = committed.state
// clean up
Expand All @@ -108,9 +114,10 @@ class Transaction {
* @returns {Result} New Result
*/
rollback () {
let committed = this._state.rollback(
const committed = this._state.rollback(
this._connectionHolder,
new _TransactionStreamObserver(this)
new _TransactionStreamObserver(this),
this._onConnection
)
this._state = committed.state
// clean up
Expand Down Expand Up @@ -161,29 +168,40 @@ class _TransactionStreamObserver extends StreamObserver {
}

/** internal state machine of the transaction */
let _states = {
const _states = {
// The transaction is running with no explicit success or failure marked
ACTIVE: {
commit: (connectionHolder, observer) => {
commit: (connectionHolder, observer, onConnection) => {
return {
result: finishTransaction(true, connectionHolder, observer),
result: finishTransaction(
true,
connectionHolder,
observer,
onConnection
),
state: _states.SUCCEEDED
}
},
rollback: (connectionHolder, observer) => {
rollback: (connectionHolder, observer, onConnection) => {
return {
result: finishTransaction(false, connectionHolder, observer),
result: finishTransaction(
false,
connectionHolder,
observer,
onConnection
),
state: _states.ROLLED_BACK
}
},
run: (connectionHolder, observer, statement, parameters) => {
run: (connectionHolder, observer, statement, parameters, onConnection) => {
// RUN in explicit transaction can't contain bookmarks and transaction configuration
const bookmark = Bookmark.empty()
const txConfig = TxConfig.empty()

connectionHolder
.getConnection(observer)
.then(conn =>
.then(conn => {
onConnection()
conn
.protocol()
.run(
Expand All @@ -194,7 +212,7 @@ let _states = {
connectionHolder.mode(),
observer
)
)
})
.catch(error => observer.onError(error))

return _newRunResult(observer, statement, parameters, () =>
Expand All @@ -206,7 +224,7 @@ let _states = {
// An error has occurred, transaction can no longer be used and no more messages will
// be sent for this transaction.
FAILED: {
commit: (connectionHolder, observer) => {
commit: (connectionHolder, observer, onConnection) => {
observer.onError({
error:
'Cannot commit statements in this transaction, because previous statements in the ' +
Expand All @@ -218,14 +236,14 @@ let _states = {
state: _states.FAILED
}
},
rollback: (connectionHolder, observer) => {
rollback: (connectionHolder, observer, onConnection) => {
observer.markCompleted()
return {
result: _newDummyResult(observer, 'ROLLBACK', {}),
state: _states.FAILED
}
},
run: (connectionHolder, observer, statement, parameters) => {
run: (connectionHolder, observer, statement, parameters, onConnection) => {
observer.onError({
error:
'Cannot run statement, because previous statements in the ' +
Expand All @@ -237,7 +255,7 @@ let _states = {

// This transaction has successfully committed
SUCCEEDED: {
commit: (connectionHolder, observer) => {
commit: (connectionHolder, observer, onConnection) => {
observer.onError({
error:
'Cannot commit statements in this transaction, because commit has already been successfully called on the transaction and transaction has been closed. Please start a new' +
Expand All @@ -248,7 +266,7 @@ let _states = {
state: _states.SUCCEEDED
}
},
rollback: (connectionHolder, observer) => {
rollback: (connectionHolder, observer, onConnection) => {
observer.onError({
error:
'Cannot rollback transaction, because transaction has already been successfully closed.'
Expand All @@ -258,7 +276,7 @@ let _states = {
state: _states.SUCCEEDED
}
},
run: (connectionHolder, observer, statement, parameters) => {
run: (connectionHolder, observer, statement, parameters, onConnection) => {
observer.onError({
error:
'Cannot run statement, because transaction has already been successfully closed.'
Expand All @@ -269,7 +287,7 @@ let _states = {

// This transaction has been rolled back
ROLLED_BACK: {
commit: (connectionHolder, observer) => {
commit: (connectionHolder, observer, onConnection) => {
observer.onError({
error:
'Cannot commit this transaction, because it has already been rolled back.'
Expand All @@ -279,7 +297,7 @@ let _states = {
state: _states.ROLLED_BACK
}
},
rollback: (connectionHolder, observer) => {
rollback: (connectionHolder, observer, onConnection) => {
observer.onError({
error:
'Cannot rollback transaction, because transaction has already been rolled back.'
Expand All @@ -289,7 +307,7 @@ let _states = {
state: _states.ROLLED_BACK
}
},
run: (connectionHolder, observer, statement, parameters) => {
run: (connectionHolder, observer, statement, parameters, onConnection) => {
observer.onError({
error:
'Cannot run statement, because transaction has already been rolled back.'
Expand All @@ -299,10 +317,11 @@ let _states = {
}
}

function finishTransaction (commit, connectionHolder, observer) {
function finishTransaction (commit, connectionHolder, observer, onConnection) {
connectionHolder
.getConnection(observer)
.then(connection => {
onConnection()
if (commit) {
return connection.protocol().commitTransaction(observer)
} else {
Expand Down
7 changes: 4 additions & 3 deletions test/internal/node/routing.driver.boltkit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1821,6 +1821,7 @@ describe('routing driver with stub server', () => {
)

boltStub.run(() => {
const session = driver.session(READ)
session
.readTransaction(tx =>
tx.run('MATCH (n) RETURN n.name AS name')
Expand Down Expand Up @@ -2328,7 +2329,7 @@ describe('routing driver with stub server', () => {
const session = driver.session(WRITE, bookmarks)
const tx = session.beginTransaction()

tx.run(`CREATE (n {name:'Bob'})`).then(() => {
tx.run('CREATE (n {name:\'Bob\'})').then(() => {
tx.commit().then(() => {
expect(session.lastBookmark()).toEqual('neo4j:bookmark:v1:tx95')

Expand All @@ -2348,11 +2349,11 @@ describe('routing driver with stub server', () => {
})

it('should forget writer on database unavailable error', done => {
testAddressPurgeOnDatabaseError(`CREATE (n {name:'Bob'})`, WRITE, done)
testAddressPurgeOnDatabaseError('CREATE (n {name:\'Bob\'})', WRITE, done)
})

it('should forget reader on database unavailable error', done => {
testAddressPurgeOnDatabaseError(`RETURN 1`, READ, done)
testAddressPurgeOnDatabaseError('RETURN 1', READ, done)
})

it('should use resolver function that returns array during first discovery', done => {
Expand Down
2 changes: 0 additions & 2 deletions test/v1/temporal-types.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,6 @@ describe('temporal-types', () => {
const value = records[0].get(0)
expect(value).toEqual(expectedValue)

session.close()
done()
})
.catch(error => {
Expand All @@ -1498,7 +1497,6 @@ describe('temporal-types', () => {
const receivedValue = records[0].get(0)
expect(receivedValue).toEqual(value)

session.close()
done()
})
.catch(error => {
Expand Down
21 changes: 21 additions & 0 deletions test/v1/transaction.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,27 @@ describe('transaction', () => {
})
})

it('should reset transaction', done => {
const RetryTimeoutLimit = 10000
const TransactionTimeout = 30000

const driver = neo4j.driver('bolt://localhost', sharedNeo4j.authToken, {
maxTransactionRetryTime: RetryTimeoutLimit,
connectionTimeout: TransactionTimeout
})
const session = driver.session()
const runPromise = session
.readTransaction(transaction => transaction.run('RETURN 1'))
.catch(error => {
expect(error.message).toBe(
'You cannot run more transactions on a closed session.'
)
driver.close()
done()
})
session.close() // This will interrupt runPromise to reset the transaction
})

function expectSyntaxError (error) {
expect(error.code).toBe('Neo.ClientError.Statement.SyntaxError')
}
Expand Down