From 60c7dfe731a75f35c9c9ede30b93f6a0c9cd17fc Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 20 Jul 2018 16:03:50 +0200 Subject: [PATCH 1/2] Rollback transaction after failure in transaction functions Previously, when transaction function failed with a retryable error transaction was not explicitly rolled back. This could cause driver not return connections back to the pool. This commit fixes the problem by adding an explicit rollback after a failed transaction function. --- src/v1/internal/transaction-executor.js | 69 ++++++++++----- test/internal/transaction-executor.test.js | 98 ++++++++++++++++------ test/v1/session.test.js | 70 ++++++++++++++++ 3 files changed, 191 insertions(+), 46 deletions(-) diff --git a/src/v1/internal/transaction-executor.js b/src/v1/internal/transaction-executor.js index 87887bf1e..5f48034af 100644 --- a/src/v1/internal/transaction-executor.js +++ b/src/v1/internal/transaction-executor.js @@ -76,37 +76,62 @@ export default class TransactionExecutor { } _executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject) { + let tx; try { - const tx = transactionCreator(); - const transactionWorkResult = transactionWork(tx); + tx = transactionCreator(); + } catch (error) { + // failed to create a transaction + reject(error); + return; + } + + const resultPromise = this._safeExecuteTransactionWork(tx, transactionWork); + resultPromise + .then(result => this._handleTransactionWorkSuccess(result, tx, resolve, reject)) + .catch(error => this._handleTransactionWorkFailure(error, tx, reject)); + } + + _safeExecuteTransactionWork(tx, transactionWork) { + try { + const result = transactionWork(tx); // user defined callback is supposed to return a promise, but it might not; so to protect against an // incorrect API usage we wrap the returned value with a resolved promise; this is effectively a // validation step without type checks - const resultPromise = Promise.resolve(transactionWorkResult); - - resultPromise.then(result => { - if (tx.isOpen()) { - // transaction work returned resolved promise and transaction has not been committed/rolled back - // try to commit the transaction - tx.commit().then(() => { - // transaction was committed, return result to the user - resolve(result); - }).catch(error => { - // transaction failed to commit, propagate the failure - reject(error); - }); - } else { - // transaction work returned resolved promise and transaction is already committed/rolled back - // return the result returned by given transaction work - resolve(result); - } + return Promise.resolve(result); + } catch (error) { + return Promise.reject(error); + } + } + + _handleTransactionWorkSuccess(result, tx, resolve, reject) { + if (tx.isOpen()) { + // transaction work returned resolved promise and transaction has not been committed/rolled back + // try to commit the transaction + tx.commit().then(() => { + // transaction was committed, return result to the user + resolve(result); }).catch(error => { - // transaction work returned rejected promise, propagate the failure + // transaction failed to commit, propagate the failure reject(error); }); + } else { + // transaction work returned resolved promise and transaction is already committed/rolled back + // return the result returned by given transaction work + resolve(result); + } + } - } catch (error) { + _handleTransactionWorkFailure(error, tx, reject) { + if (tx.isOpen()) { + // transaction work failed and the transaction is still open, roll it back and propagate the failure + tx.rollback() + .catch(ignore => { + // ignore the rollback error + }) + .then(() => reject(error)); // propagate the original error we got from the transaction work + } else { + // transaction is already rolled back, propagate the error reject(error); } } diff --git a/test/internal/transaction-executor.test.js b/test/internal/transaction-executor.test.js index 6032611c3..49a69464e 100644 --- a/test/internal/transaction-executor.test.js +++ b/test/internal/transaction-executor.test.js @@ -80,13 +80,13 @@ describe('TransactionExecutor', () => { it('should stop retrying when time expires', done => { const executor = new TransactionExecutor(); - let workInvocationCounter = 0; + const usedTransactions = []; const realWork = transactionWork([SERVICE_UNAVAILABLE, SESSION_EXPIRED, TRANSIENT_ERROR_1, TRANSIENT_ERROR_2], 42); const result = executor.execute(transactionCreator(), tx => { expect(tx).toBeDefined(); - workInvocationCounter++; - if (workInvocationCounter === 3) { + usedTransactions.push(tx); + if (usedTransactions.length === 3) { const currentTime = Date.now(); clock = lolex.install(); clock.setSystemTime(currentTime + 30001); // move `Date.now()` call forward by 30 seconds @@ -95,7 +95,8 @@ describe('TransactionExecutor', () => { }); result.catch(error => { - expect(workInvocationCounter).toEqual(3); + expect(usedTransactions.length).toEqual(3); + expectAllTransactionsToBeClosed(usedTransactions); expect(error.code).toEqual(TRANSIENT_ERROR_1); done(); }); @@ -152,6 +153,14 @@ describe('TransactionExecutor', () => { ); }); + it('should retry when transaction work throws and rollback fails', done => { + testRetryWhenTransactionWorkThrowsAndRollbackFails( + [SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, SESSION_EXPIRED, SESSION_EXPIRED], + [SESSION_EXPIRED, TRANSIENT_ERROR_1], + done + ); + }); + it('should cancel in-flight timeouts when closed', done => { const executor = new TransactionExecutor(); // do not execute setTimeout callbacks @@ -190,16 +199,16 @@ describe('TransactionExecutor', () => { function testRetryWhenTransactionCreatorFails(errorCodes, done) { const executor = new TransactionExecutor(); const transactionCreator = throwingTransactionCreator(errorCodes, new FakeTransaction()); - let workInvocationCounter = 0; + const usedTransactions = []; const result = executor.execute(transactionCreator, tx => { expect(tx).toBeDefined(); - workInvocationCounter++; + usedTransactions.push(tx); return Promise.resolve(42); }); result.then(value => { - expect(workInvocationCounter).toEqual(1); + expect(usedTransactions.length).toEqual(1); expect(value).toEqual(42); verifyRetryDelays(fakeSetTimeout, errorCodes.length); done(); @@ -208,18 +217,19 @@ describe('TransactionExecutor', () => { function testRetryWhenTransactionWorkReturnsRejectedPromise(errorCodes, done) { const executor = new TransactionExecutor(); - let workInvocationCounter = 0; + const usedTransactions = []; const realWork = transactionWork(errorCodes, 42); const result = executor.execute(transactionCreator(), tx => { expect(tx).toBeDefined(); - workInvocationCounter++; + usedTransactions.push(tx); return realWork(); }); result.then(value => { // work should have failed 'failures.length' times and succeeded 1 time - expect(workInvocationCounter).toEqual(errorCodes.length + 1); + expect(usedTransactions.length).toEqual(errorCodes.length + 1); + expectAllTransactionsToBeClosed(usedTransactions); expect(value).toEqual(42); verifyRetryDelays(fakeSetTimeout, errorCodes.length); done(); @@ -228,18 +238,19 @@ describe('TransactionExecutor', () => { function testRetryWhenTransactionCommitReturnsRejectedPromise(errorCodes, done) { const executor = new TransactionExecutor(); - let workInvocationCounter = 0; + const usedTransactions = []; const realWork = () => Promise.resolve(4242); const result = executor.execute(transactionCreator(errorCodes), tx => { expect(tx).toBeDefined(); - workInvocationCounter++; + usedTransactions.push(tx); return realWork(); }); result.then(value => { // work should have failed 'failures.length' times and succeeded 1 time - expect(workInvocationCounter).toEqual(errorCodes.length + 1); + expect(usedTransactions.length).toEqual(errorCodes.length + 1); + expectAllTransactionsToBeClosed(usedTransactions); expect(value).toEqual(4242); verifyRetryDelays(fakeSetTimeout, errorCodes.length); done(); @@ -248,37 +259,60 @@ describe('TransactionExecutor', () => { function testRetryWhenTransactionWorkThrows(errorCodes, done) { const executor = new TransactionExecutor(); - let workInvocationCounter = 0; + const usedTransactions = []; const realWork = throwingTransactionWork(errorCodes, 42); const result = executor.execute(transactionCreator(), tx => { expect(tx).toBeDefined(); - workInvocationCounter++; + usedTransactions.push(tx); return realWork(); }); result.then(value => { // work should have failed 'failures.length' times and succeeded 1 time - expect(workInvocationCounter).toEqual(errorCodes.length + 1); + expect(usedTransactions.length).toEqual(errorCodes.length + 1); + expectAllTransactionsToBeClosed(usedTransactions); expect(value).toEqual(42); verifyRetryDelays(fakeSetTimeout, errorCodes.length); done(); }); } + function testRetryWhenTransactionWorkThrowsAndRollbackFails(txWorkErrorCodes, rollbackErrorCodes, done) { + const executor = new TransactionExecutor(); + const usedTransactions = []; + const realWork = throwingTransactionWork(txWorkErrorCodes, 424242); + + const result = executor.execute(transactionCreator([], rollbackErrorCodes), tx => { + expect(tx).toBeDefined(); + usedTransactions.push(tx); + return realWork(); + }); + + result.then(value => { + // work should have failed 'failures.length' times and succeeded 1 time + expect(usedTransactions.length).toEqual(txWorkErrorCodes.length + 1); + expectAllTransactionsToBeClosed(usedTransactions); + expect(value).toEqual(424242); + verifyRetryDelays(fakeSetTimeout, txWorkErrorCodes.length); + done(); + }); + } + function testNoRetryOnUnknownError(errorCodes, expectedWorkInvocationCount, done) { const executor = new TransactionExecutor(); - let workInvocationCounter = 0; + const usedTransactions = []; const realWork = transactionWork(errorCodes, 42); const result = executor.execute(transactionCreator(), tx => { expect(tx).toBeDefined(); - workInvocationCounter++; + usedTransactions.push(tx); return realWork(); }); result.catch(error => { - expect(workInvocationCounter).toEqual(expectedWorkInvocationCount); + expect(usedTransactions.length).toEqual(expectedWorkInvocationCount); + expectAllTransactionsToBeClosed(usedTransactions); if (errorCodes.length === 1) { expect(error.code).toEqual(errorCodes[0]); } else { @@ -290,9 +324,10 @@ describe('TransactionExecutor', () => { }); -function transactionCreator(commitErrorCodes) { - const remainingErrorCodes = (commitErrorCodes || []).slice().reverse(); - return () => new FakeTransaction(remainingErrorCodes.pop()); +function transactionCreator(commitErrorCodes, rollbackErrorCodes) { + const remainingCommitErrorCodes = (commitErrorCodes || []).slice().reverse(); + const remainingRollbackErrorCodes = (rollbackErrorCodes || []).slice().reverse(); + return () => new FakeTransaction(remainingCommitErrorCodes.pop(), remainingRollbackErrorCodes.pop()); } function throwingTransactionCreator(errorCodes, result) { @@ -348,20 +383,35 @@ function verifyRetryDelays(fakeSetTimeout, expectedInvocationCount) { }); } +function expectAllTransactionsToBeClosed(transactions) { + transactions.forEach(tx => expect(tx.isOpen()).toBeFalsy()); +} + class FakeTransaction { - constructor(commitErrorCode) { + constructor(commitErrorCode, rollbackErrorCode) { this._commitErrorCode = commitErrorCode; + this._rollbackErrorCode = rollbackErrorCode; + this._open = true; } isOpen() { - return true; + return this._open; } commit() { + this._open = false; if (this._commitErrorCode) { return Promise.reject(error(this._commitErrorCode)); } return Promise.resolve(); } + + rollback() { + this._open = false; + if (this._rollbackErrorCode) { + return Promise.reject(error(this._rollbackErrorCode)); + } + return Promise.resolve(); + } } diff --git a/test/v1/session.test.js b/test/v1/session.test.js index 77c6913b0..59d02f23f 100644 --- a/test/v1/session.test.js +++ b/test/v1/session.test.js @@ -27,6 +27,7 @@ import sharedNeo4j from '../internal/shared-neo4j'; import _ from 'lodash'; import {ServerVersion, VERSION_3_1_0} from '../../src/v1/internal/server-version'; import {isString} from '../../src/v1/internal/util'; +import {newError, PROTOCOL_ERROR, SESSION_EXPIRED} from '../../src/v1/error'; describe('session', () => { @@ -1092,6 +1093,75 @@ describe('session', () => { testUnsupportedQueryParameter(new neo4j.types.Path(node1, node2, []), done); }); + it('should retry transaction until success when function throws', done => { + testTransactionRetryUntilSuccess(() => { + throw newError('Error that can be retried', SESSION_EXPIRED); + }, done); + }); + + it('should retry transaction until success when function returns rejected promise', done => { + testTransactionRetryUntilSuccess(() => Promise.reject(newError('Error that can be retried', SESSION_EXPIRED)), done); + }); + + it('should not retry transaction when function throws fatal error', done => { + testTransactionRetryOnFatalError(() => { + throw newError('Error that is fatal', PROTOCOL_ERROR); + }, done); + }); + + it('should not retry transaction when function returns promise rejected with fatal error', done => { + testTransactionRetryOnFatalError(() => Promise.reject(newError('Error that is fatal', 'ReallyFatalErrorCode')), done); + }); + + function testTransactionRetryUntilSuccess(failureResponseFunction, done) { + const session = driver.session(); + + const failures = 3; + const usedTransactions = []; + + const resultPromise = session.writeTransaction(tx => { + usedTransactions.push(tx); + if (usedTransactions.length < failures) { + return failureResponseFunction(); + } else { + return tx.run('RETURN "424242"'); + } + }); + + resultPromise.then(result => { + expect(result.records[0].get(0)).toEqual('424242'); + expect(usedTransactions.length).toEqual(3); + usedTransactions.forEach(tx => expect(tx.isOpen()).toBeFalsy()); + session.close(); + done(); + }).catch(error => { + done.fail(error); + }); + } + + function testTransactionRetryOnFatalError(failureResponseFunction, done) { + const session = driver.session(); + + const usedTransactions = []; + + const resultPromise = session.writeTransaction(tx => { + usedTransactions.push(tx); + return failureResponseFunction(); + }); + + resultPromise.then(result => { + session.close(); + done.fail('Retries should not succeed: ' + JSON.stringify(result)); + }).catch(error => { + session.close(); + expect(error).toBeDefined(); + expect(error).not.toBeNull(); + expect(usedTransactions.length).toEqual(1); + expect(usedTransactions[0].isOpen()).toBeFalsy(); + done(); + }); + } + function serverIs31OrLater(done) { if (serverVersion.compareTo(VERSION_3_1_0) < 0) { done(); From 1dd24f929caec61003a32ba9905d9166de37263c Mon Sep 17 00:00:00 2001 From: lutovich Date: Fri, 20 Jul 2018 16:21:26 +0200 Subject: [PATCH 2/2] Add transaction functions to stress test --- test/v1/stress.test.js | 61 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/test/v1/stress.test.js b/test/v1/stress.test.js index 0723c9b17..07c81ec27 100644 --- a/test/v1/stress.test.js +++ b/test/v1/stress.test.js @@ -46,16 +46,21 @@ describe('stress tests', () => { const DATABASE_URI = fromEnvOrDefault('STRESS_TEST_DATABASE_URI', 'bolt://localhost'); const LOGGING_ENABLED = fromEnvOrDefault('STRESS_TEST_LOGGING_ENABLED', false); - let originalJasmineTimeout; + let originalTimeout; let driver; beforeEach(done => { + originalTimeout = jasmine.DEFAULT_TIMEOUT_INTERVAL; + jasmine.DEFAULT_TIMEOUT_INTERVAL = TEST_MODE.maxRunTimeMs; + driver = neo4j.driver(DATABASE_URI, sharedNeo4j.authToken); cleanupDb(driver).then(() => done()); }); afterEach(done => { + jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout; + cleanupDb(driver).then(() => { driver.close(); done(); @@ -79,7 +84,7 @@ describe('stress tests', () => { .then(() => done()) .catch(error => done.fail(error)); }); - }, TEST_MODE.maxRunTimeMs); + }); function createCommands(context) { const uniqueCommands = createUniqueCommands(context); @@ -100,11 +105,15 @@ describe('stress tests', () => { readQueryCommand(context), readQueryWithBookmarkCommand(context), readQueryInTxCommand(context), + readQueryInTxFunctionCommand(context), readQueryInTxWithBookmarkCommand(context), + readQueryInTxFunctionWithBookmarkCommand(context), writeQueryCommand(context), writeQueryWithBookmarkCommand(context), writeQueryInTxCommand(context), - writeQueryInTxWithBookmarkCommand(context) + writeQueryInTxFunctionCommand(context), + writeQueryInTxWithBookmarkCommand(context), + writeQueryInTxFunctionWithBookmarkCommand(context) ]; } @@ -120,10 +129,18 @@ describe('stress tests', () => { return queryInTxCommand(context, READ_QUERY, () => noParams(), READ, false); } + function readQueryInTxFunctionCommand(context) { + return queryInTxFunctionCommand(context, READ_QUERY, () => noParams(), READ, false); + } + function readQueryInTxWithBookmarkCommand(context) { return queryInTxCommand(context, READ_QUERY, () => noParams(), READ, true); } + function readQueryInTxFunctionWithBookmarkCommand(context) { + return queryInTxFunctionCommand(context, READ_QUERY, () => noParams(), READ, true); + } + function writeQueryCommand(context) { return queryCommand(context, WRITE_QUERY, () => randomParams(), WRITE, false); } @@ -136,10 +153,18 @@ describe('stress tests', () => { return queryInTxCommand(context, WRITE_QUERY, () => randomParams(), WRITE, false); } + function writeQueryInTxFunctionCommand(context) { + return queryInTxFunctionCommand(context, WRITE_QUERY, () => randomParams(), WRITE, false); + } + function writeQueryInTxWithBookmarkCommand(context) { return queryInTxCommand(context, WRITE_QUERY, () => randomParams(), WRITE, true); } + function writeQueryInTxFunctionWithBookmarkCommand(context) { + return queryInTxFunctionCommand(context, WRITE_QUERY, () => randomParams(), WRITE, true); + } + function queryCommand(context, query, paramsSupplier, accessMode, useBookmark) { return callback => { const commandId = context.nextCommandId(); @@ -163,6 +188,36 @@ describe('stress tests', () => { }; } + function queryInTxFunctionCommand(context, query, paramsSupplier, accessMode, useBookmark) { + return callback => { + const commandId = context.nextCommandId(); + const params = paramsSupplier(); + const session = newSession(context, accessMode, useBookmark); + + context.log(commandId, `About to run ${accessMode} query in TX function`); + + let resultPromise; + if (accessMode === READ) { + resultPromise = session.readTransaction(tx => tx.run(query, params)); + } else { + resultPromise = session.writeTransaction(tx => tx.run(query, params)); + } + + resultPromise.then(result => { + context.queryCompleted(result, accessMode, session.lastBookmark()); + context.log(commandId, `Transaction function executed successfully`); + + session.close(() => { + const possibleError = verifyQueryResult(result); + callback(possibleError); + }); + }).catch(error => { + context.log(commandId, `Transaction function failed with error ${JSON.stringify(error)}`); + callback(error); + }); + }; + } + function queryInTxCommand(context, query, paramsSupplier, accessMode, useBookmark) { return callback => { const commandId = context.nextCommandId();