From 3d278b6a3379e573ec9f5df3bd913a832c876eac Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 15 Aug 2023 15:09:50 +0200 Subject: [PATCH] Driver.executeQuery optimization Pipeline begin and run when using `executeQuery` to reduce the number of round trips. --- packages/core/src/driver.ts | 2 +- packages/core/src/internal/query-executor.ts | 4 + .../core/src/internal/transaction-executor.ts | 9 +- packages/core/src/session.ts | 9 + .../core/test/internal/query-executor.test.ts | 38 +- packages/core/test/session.test.ts | 20 + packages/neo4j-driver-deno/lib/core/driver.ts | 2 +- .../lib/core/internal/query-executor.ts | 4 + .../lib/core/internal/transaction-executor.ts | 9 +- .../neo4j-driver-deno/lib/core/session.ts | 9 + .../internal/transaction-executor.test.js | 698 ++++++++++-------- .../testkit-backend/src/feature/common.js | 1 + 12 files changed, 476 insertions(+), 329 deletions(-) diff --git a/packages/core/src/driver.ts b/packages/core/src/driver.ts index b827d53e7..8beddeb1e 100644 --- a/packages/core/src/driver.ts +++ b/packages/core/src/driver.ts @@ -444,7 +444,7 @@ class Driver { config: DriverConfig = {}, createConnectionProvider: CreateConnectionProvider, createSession: CreateSession = args => new Session(args), - createQueryExecutor: CreateQueryExecutor = createQuery => new QueryExecutor(createQuery) + createQueryExecutor: CreateQueryExecutor = createSession => new QueryExecutor(createSession) ) { sanitizeConfig(config) diff --git a/packages/core/src/internal/query-executor.ts b/packages/core/src/internal/query-executor.ts index 05a1ead56..95ea3f3dd 100644 --- a/packages/core/src/internal/query-executor.ts +++ b/packages/core/src/internal/query-executor.ts @@ -46,6 +46,10 @@ export default class QueryExecutor { bookmarkManager: config.bookmarkManager, impersonatedUser: config.impersonatedUser }) + + // @ts-expect-error The method is private for external users + session._setTxExecutorToPipelineBegin(true) + try { const executeInTransaction: TransactionFunction = config.routing === 'READ' ? session.executeRead.bind(session) diff --git a/packages/core/src/internal/transaction-executor.ts b/packages/core/src/internal/transaction-executor.ts index 1d7677e87..ae9eac969 100644 --- a/packages/core/src/internal/transaction-executor.ts +++ b/packages/core/src/internal/transaction-executor.ts @@ -39,6 +39,7 @@ export class TransactionExecutor { private readonly _multiplier: number private readonly _jitterFactor: number private _inFlightTimeoutIds: Timeout[] + public pipelineBegin: boolean constructor ( maxRetryTimeMs?: number | null, @@ -64,6 +65,7 @@ export class TransactionExecutor { ) this._inFlightTimeoutIds = [] + this.pipelineBegin = false this._verifyAfterConstruction() } @@ -154,7 +156,8 @@ export class TransactionExecutor { ): Promise { let tx: Transaction try { - tx = await transactionCreator() + const txPromise = transactionCreator() + tx = this.pipelineBegin ? txPromise : await txPromise } catch (error) { // failed to create a transaction reject(error) @@ -192,7 +195,7 @@ export class TransactionExecutor { _handleTransactionWorkSuccess( result: T, - tx: Transaction, + tx: Transaction | TransactionPromise, resolve: Resolve, reject: Reject ): void { @@ -215,7 +218,7 @@ export class TransactionExecutor { } } - _handleTransactionWorkFailure (error: any, tx: Transaction, reject: Reject): void { + _handleTransactionWorkFailure (error: any, tx: Transaction | TransactionPromise, reject: Reject): void { if (tx.isOpen()) { // transaction work failed and the transaction is still open, roll it back and propagate the failure tx.rollback() diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 973edc73f..4d56e7196 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -587,6 +587,15 @@ class Session { } } + /** + * Configure the transaction executor to pipeline transaction begin. + * + * @private + */ + private _setTxExecutorToPipelineBegin (pipelined: boolean): void { + this._transactionExecutor.pipelineBegin = pipelined + } + /** * @protected */ diff --git a/packages/core/test/internal/query-executor.test.ts b/packages/core/test/internal/query-executor.test.ts index 5ec170579..0aa4e2333 100644 --- a/packages/core/test/internal/query-executor.test.ts +++ b/packages/core/test/internal/query-executor.test.ts @@ -86,6 +86,21 @@ describe('QueryExecutor', () => { expect(spyOnExecuteRead).toHaveBeenCalled() }) + it('should configure the session with pipeline begin', async () => { + const { queryExecutor, sessionsCreated } = createExecutor() + + await queryExecutor.execute(baseConfig, 'query') + + expect(sessionsCreated.length).toBe(1) + const [{ spyOnSetTxExecutorToPipelineBegin, spyOnExecuteRead }] = sessionsCreated + + expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledTimes(1) + expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledWith(true) + expect(spyOnExecuteRead.mock.invocationCallOrder[0]).toBeGreaterThan( + spyOnSetTxExecutorToPipelineBegin.mock.invocationCallOrder[0] + ) + }) + it('should call not call executeWrite', async () => { const { queryExecutor, sessionsCreated } = createExecutor() @@ -213,6 +228,21 @@ describe('QueryExecutor', () => { expect(spyOnExecuteWrite).toHaveBeenCalled() }) + it('should configure the session with pipeline begin', async () => { + const { queryExecutor, sessionsCreated } = createExecutor() + + await queryExecutor.execute(baseConfig, 'query') + + expect(sessionsCreated.length).toBe(1) + const [{ spyOnSetTxExecutorToPipelineBegin, spyOnExecuteWrite }] = sessionsCreated + + expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledTimes(1) + expect(spyOnSetTxExecutorToPipelineBegin).toHaveBeenCalledWith(true) + expect(spyOnExecuteWrite.mock.invocationCallOrder[0]).toBeGreaterThan( + spyOnSetTxExecutorToPipelineBegin.mock.invocationCallOrder[0] + ) + }) + it('should call not call executeRead', async () => { const { queryExecutor, sessionsCreated } = createExecutor() @@ -316,7 +346,7 @@ describe('QueryExecutor', () => { spyOnExecuteRead: jest.SpyInstance spyOnExecuteWrite: jest.SpyInstance spyOnClose: jest.SpyInstance> - + spyOnSetTxExecutorToPipelineBegin: jest.SpyInstance }> createSession: jest.Mock } { @@ -329,7 +359,7 @@ describe('QueryExecutor', () => { spyOnExecuteRead: jest.SpyInstance spyOnExecuteWrite: jest.SpyInstance spyOnClose: jest.SpyInstance> - + spyOnSetTxExecutorToPipelineBegin: jest.SpyInstance }> = [] const createSession = jest.fn((args) => { const session = new Session(args) @@ -337,7 +367,9 @@ describe('QueryExecutor', () => { session, spyOnExecuteRead: jest.spyOn(session, 'executeRead'), spyOnExecuteWrite: jest.spyOn(session, 'executeWrite'), - spyOnClose: jest.spyOn(session, 'close') + spyOnClose: jest.spyOn(session, 'close'), + // @ts-expect-error + spyOnSetTxExecutorToPipelineBegin: jest.spyOn(session, '_setTxExecutorToPipelineBegin') } sessionsCreated.push(sessionCreated) _mockSessionExecuteRead(sessionCreated.spyOnExecuteRead) diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index b5b88ddc0..a23863b59 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -20,6 +20,7 @@ import { ConnectionProvider, Session, Connection, TransactionPromise, Transactio import { bookmarks } from '../src/internal' import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants' import { Logger } from '../src/internal/logger' +import { TransactionExecutor } from '../src/internal/transaction-executor' import ManagedTransaction from '../src/transaction-managed' import { AuthToken, LoggerFunction } from '../src/types' import FakeConnection from './utils/connection.fake' @@ -1150,6 +1151,25 @@ describe('session', () => { ) }) }) + + describe('Pipeline Begin on TxFunc', () => { + it('session should not change the default on session creation', () => { + const session = newSessionWithConnection(new FakeConnection()) + + // @ts-expect-error + expect(session._transactionExecutor.pipelineBegin).toEqual(new TransactionExecutor().pipelineBegin) + }) + + it.each([true, false])('_setTxExecutorToPipelineBegin(%s) => configure executor', (pipelined) => { + const session = newSessionWithConnection(new FakeConnection()) + + // @ts-expect-error + session._setTxExecutorToPipelineBegin(pipelined) + + // @ts-expect-error + expect(session._transactionExecutor.pipelineBegin).toEqual(pipelined) + }) + }) }) function mockBeginWithSuccess (connection: FakeConnection): FakeConnection { diff --git a/packages/neo4j-driver-deno/lib/core/driver.ts b/packages/neo4j-driver-deno/lib/core/driver.ts index bb7546a51..b5d0b02b2 100644 --- a/packages/neo4j-driver-deno/lib/core/driver.ts +++ b/packages/neo4j-driver-deno/lib/core/driver.ts @@ -444,7 +444,7 @@ class Driver { config: DriverConfig = {}, createConnectionProvider: CreateConnectionProvider, createSession: CreateSession = args => new Session(args), - createQueryExecutor: CreateQueryExecutor = createQuery => new QueryExecutor(createQuery) + createQueryExecutor: CreateQueryExecutor = createSession => new QueryExecutor(createSession) ) { sanitizeConfig(config) diff --git a/packages/neo4j-driver-deno/lib/core/internal/query-executor.ts b/packages/neo4j-driver-deno/lib/core/internal/query-executor.ts index cd5dca137..bbf9b2ba1 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/query-executor.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/query-executor.ts @@ -46,6 +46,10 @@ export default class QueryExecutor { bookmarkManager: config.bookmarkManager, impersonatedUser: config.impersonatedUser }) + + // @ts-expect-error The method is private for external users + session._setTxExecutorToPipelineBegin(true) + try { const executeInTransaction: TransactionFunction = config.routing === 'READ' ? session.executeRead.bind(session) diff --git a/packages/neo4j-driver-deno/lib/core/internal/transaction-executor.ts b/packages/neo4j-driver-deno/lib/core/internal/transaction-executor.ts index df0f131b3..e95ce3a9a 100644 --- a/packages/neo4j-driver-deno/lib/core/internal/transaction-executor.ts +++ b/packages/neo4j-driver-deno/lib/core/internal/transaction-executor.ts @@ -39,6 +39,7 @@ export class TransactionExecutor { private readonly _multiplier: number private readonly _jitterFactor: number private _inFlightTimeoutIds: Timeout[] + public pipelineBegin: boolean constructor ( maxRetryTimeMs?: number | null, @@ -64,6 +65,7 @@ export class TransactionExecutor { ) this._inFlightTimeoutIds = [] + this.pipelineBegin = false this._verifyAfterConstruction() } @@ -154,7 +156,8 @@ export class TransactionExecutor { ): Promise { let tx: Transaction try { - tx = await transactionCreator() + const txPromise = transactionCreator() + tx = this.pipelineBegin ? txPromise : await txPromise } catch (error) { // failed to create a transaction reject(error) @@ -192,7 +195,7 @@ export class TransactionExecutor { _handleTransactionWorkSuccess( result: T, - tx: Transaction, + tx: Transaction | TransactionPromise, resolve: Resolve, reject: Reject ): void { @@ -215,7 +218,7 @@ export class TransactionExecutor { } } - _handleTransactionWorkFailure (error: any, tx: Transaction, reject: Reject): void { + _handleTransactionWorkFailure (error: any, tx: Transaction | TransactionPromise, reject: Reject): void { if (tx.isOpen()) { // transaction work failed and the transaction is still open, roll it back and propagate the failure tx.rollback() diff --git a/packages/neo4j-driver-deno/lib/core/session.ts b/packages/neo4j-driver-deno/lib/core/session.ts index 386991b17..5b8425a81 100644 --- a/packages/neo4j-driver-deno/lib/core/session.ts +++ b/packages/neo4j-driver-deno/lib/core/session.ts @@ -587,6 +587,15 @@ class Session { } } + /** + * Configure the transaction executor to pipeline transaction begin. + * + * @private + */ + private _setTxExecutorToPipelineBegin (pipelined: boolean): void { + this._transactionExecutor.pipelineBegin = pipelined + } + /** * @protected */ diff --git a/packages/neo4j-driver/test/internal/transaction-executor.test.js b/packages/neo4j-driver/test/internal/transaction-executor.test.js index 13af353a1..e0c10ba48 100644 --- a/packages/neo4j-driver/test/internal/transaction-executor.test.js +++ b/packages/neo4j-driver/test/internal/transaction-executor.test.js @@ -122,354 +122,374 @@ describe('#unit TransactionExecutor', () => { }, 60000) }) -describe('#unit TransactionExecutor', () => { - it('should retry when transaction work returns promise rejected with SERVICE_UNAVAILABLE', async () => { - await testRetryWhenTransactionWorkReturnsRejectedPromise([ - SERVICE_UNAVAILABLE - ]) - }, 30000) - - it('should retry when transaction work returns promise rejected with SESSION_EXPIRED', async () => { - await testRetryWhenTransactionWorkReturnsRejectedPromise([SESSION_EXPIRED]) - }, 30000) - - it('should retry when transaction work returns promise rejected with deadlock error', async () => { - await testRetryWhenTransactionWorkReturnsRejectedPromise([ - TRANSIENT_ERROR_1 - ]) - }, 30000) - - it('should retry when transaction work returns promise rejected with communication error', async () => { - await testRetryWhenTransactionWorkReturnsRejectedPromise([ - TRANSIENT_ERROR_2 - ]) - }, 30000) - - it('should not retry when transaction work returns promise rejected with OOM error', async () => { - await testNoRetryOnUnknownError([OOM_ERROR], 1) - }, 30000) - - it('should not retry when transaction work returns promise rejected with unknown error', async () => { - await testNoRetryOnUnknownError([UNKNOWN_ERROR], 1) - }, 30000) - - it('should not retry when transaction work returns promise rejected with transaction termination error', async () => { - await testNoRetryOnUnknownError([TX_TERMINATED_ERROR], 1) - }, 30000) - - it('should not retry when transaction work returns promise rejected with locks termination error', async () => { - await testNoRetryOnUnknownError([LOCKS_TERMINATED_ERROR], 1) - }, 30000) - - it('should not retry when transaction work returns promise rejected with unknown error type', async () => { - class MyTestError extends Error { - constructor (message, code) { - super(message) - this.code = code +;[true, false].forEach(pipelineBegin => { + describe(`#unit TransactionExecutor (pipelineBegin=${pipelineBegin})`, () => { + it('should retry when transaction work returns promise rejected with SERVICE_UNAVAILABLE', async () => { + await testRetryWhenTransactionWorkReturnsRejectedPromise([ + SERVICE_UNAVAILABLE + ]) + }, 30000) + + it('should retry when transaction work returns promise rejected with SESSION_EXPIRED', async () => { + await testRetryWhenTransactionWorkReturnsRejectedPromise([SESSION_EXPIRED]) + }, 30000) + + it('should retry when transaction work returns promise rejected with deadlock error', async () => { + await testRetryWhenTransactionWorkReturnsRejectedPromise([ + TRANSIENT_ERROR_1 + ]) + }, 30000) + + it('should retry when transaction work returns promise rejected with communication error', async () => { + await testRetryWhenTransactionWorkReturnsRejectedPromise([ + TRANSIENT_ERROR_2 + ]) + }, 30000) + + it('should not retry when transaction work returns promise rejected with OOM error', async () => { + await testNoRetryOnUnknownError([OOM_ERROR], 1) + }, 30000) + + it('should not retry when transaction work returns promise rejected with unknown error', async () => { + await testNoRetryOnUnknownError([UNKNOWN_ERROR], 1) + }, 30000) + + it('should not retry when transaction work returns promise rejected with transaction termination error', async () => { + await testNoRetryOnUnknownError([TX_TERMINATED_ERROR], 1) + }, 30000) + + it('should not retry when transaction work returns promise rejected with locks termination error', async () => { + await testNoRetryOnUnknownError([LOCKS_TERMINATED_ERROR], 1) + }, 30000) + + it('should not retry when transaction work returns promise rejected with unknown error type', async () => { + class MyTestError extends Error { + constructor (message, code) { + super(message) + this.code = code + } } - } - const error = new MyTestError('an unexpected error', 504) - const executor = new TransactionExecutor() - const realWork = () => Promise.reject(error) - - await expectAsync( - executor.execute(transactionCreator(), tx => realWork()) - ).toBeRejectedWith(error) - }, 30000) - - it('should retry when given transaction creator throws once', async () => { - await testRetryWhenTransactionCreatorFails([SERVICE_UNAVAILABLE]) - }, 30000) - - it('should retry when given transaction creator throws many times', async () => { - await testRetryWhenTransactionCreatorFails([ - SERVICE_UNAVAILABLE, - SESSION_EXPIRED, - TRANSIENT_ERROR_2, - SESSION_EXPIRED, - SERVICE_UNAVAILABLE, - TRANSIENT_ERROR_1, - 'Neo.ClientError.Security.AuthorizationExpired' - ]) - }, 30000) - - it('should retry when given transaction creator fail on begin once', async () => { - await testRetryWhenTransactionBeginFails([SERVICE_UNAVAILABLE]) - }, 30000) - - it('should retry when given transaction creator throws on begin many times', async () => { - await testRetryWhenTransactionBeginFails([ - SERVICE_UNAVAILABLE, - SESSION_EXPIRED, - TRANSIENT_ERROR_2, - SESSION_EXPIRED, - SERVICE_UNAVAILABLE, - TRANSIENT_ERROR_1, - 'Neo.ClientError.Security.AuthorizationExpired' - ]) - }, 30000) - - it('should retry when given transaction work throws once', async () => { - await testRetryWhenTransactionWorkThrows([SERVICE_UNAVAILABLE]) - }, 30000) - - it('should retry when given transaction work throws many times', async () => { - await testRetryWhenTransactionWorkThrows([ - SERVICE_UNAVAILABLE, - TRANSIENT_ERROR_2, - TRANSIENT_ERROR_2, - SESSION_EXPIRED, - 'Neo.ClientError.Security.AuthorizationExpired' - ]) - }, 30000) - - it('should retry when given transaction work returns rejected promise many times', async () => { - await testRetryWhenTransactionWorkReturnsRejectedPromise([ - SERVICE_UNAVAILABLE, - SERVICE_UNAVAILABLE, - TRANSIENT_ERROR_2, - SESSION_EXPIRED, - TRANSIENT_ERROR_1, - SESSION_EXPIRED, - 'Neo.ClientError.Security.AuthorizationExpired' - ]) - }, 30000) - - it('should retry when transaction commit returns rejected promise once', async () => { - await testRetryWhenTransactionCommitReturnsRejectedPromise([ - TRANSIENT_ERROR_1 - ]) - }, 30000) - - it('should retry when transaction commit returns rejected promise multiple times', async () => { - await testRetryWhenTransactionCommitReturnsRejectedPromise([ - TRANSIENT_ERROR_1, - TRANSIENT_ERROR_1, - SESSION_EXPIRED, - SERVICE_UNAVAILABLE, - TRANSIENT_ERROR_2, - 'Neo.ClientError.Security.AuthorizationExpired' - ]) - }, 30000) - - it('should retry when transaction work throws and rollback fails', async () => { - await testRetryWhenTransactionWorkThrowsAndRollbackFails( - [ + const error = new MyTestError('an unexpected error', 504) + const executor = new TransactionExecutor() + const realWork = () => Promise.reject(error) + + await expectAsync( + executor.execute(transactionCreator(), tx => realWork()) + ).toBeRejectedWith(error) + }, 30000) + + it('should retry when given transaction creator throws once', async () => { + await testRetryWhenTransactionCreatorFails([SERVICE_UNAVAILABLE]) + }, 30000) + + it('should retry when given transaction creator throws many times', async () => { + await testRetryWhenTransactionCreatorFails([ SERVICE_UNAVAILABLE, + SESSION_EXPIRED, TRANSIENT_ERROR_2, - 'Neo.ClientError.Security.AuthorizationExpired', SESSION_EXPIRED, - SESSION_EXPIRED - ], - [SESSION_EXPIRED, TRANSIENT_ERROR_1] - ) - }, 30000) - - it('should allow zero max retry time', () => { - const executor = new TransactionExecutor(0) - expect(executor._maxRetryTimeMs).toEqual(0) - }, 30000) - - it('should allow zero initial delay', () => { - const executor = new TransactionExecutor(42, 0) - expect(executor._initialRetryDelayMs).toEqual(0) - }, 30000) - - it('should disallow zero multiplier', () => { - expect(() => new TransactionExecutor(42, 42, 0)).toThrow() - }, 30000) - - it('should allow zero jitter factor', () => { - const executor = new TransactionExecutor(42, 42, 42, 0) - expect(executor._jitterFactor).toEqual(0) - }, 30000) - - it('should wrap transaction', async () => { - const executor = new TransactionExecutor() - const expectedTx = new FakeTransaction() - const modifiedTx = {} - await executor.execute(() => expectedTx, tx => { - expect(tx).toEqual(modifiedTx) - return 1 - }, tx => { - expect(tx).toEqual(expectedTx) - return modifiedTx - }) - }) + SERVICE_UNAVAILABLE, + TRANSIENT_ERROR_1, + 'Neo.ClientError.Security.AuthorizationExpired' + ]) + }, 30000) - it('should wrap transaction when re-try', async () => { - const executor = new TransactionExecutor() - const expectedTx = new FakeTransaction() - const modifiedTx = {} - const context = { workCalls: 0 } + it('should retry when given transaction creator fail on begin once', async () => { + await testRetryWhenTransactionBeginFails([SERVICE_UNAVAILABLE]) + }, 30000) - await executor.execute(() => expectedTx, tx => { - expect(tx).toEqual(modifiedTx) - if (context.workCalls++ < 1) { - throw newError('something on the way', 'Neo.ClientError.Security.AuthorizationExpired') - } - return 1 - }, tx => { - expect(tx).toEqual(expectedTx) - return modifiedTx - }) + it('should retry when given transaction creator throws on begin many times', async () => { + await testRetryWhenTransactionBeginFails([ + SERVICE_UNAVAILABLE, + SESSION_EXPIRED, + TRANSIENT_ERROR_2, + SESSION_EXPIRED, + SERVICE_UNAVAILABLE, + TRANSIENT_ERROR_1, + 'Neo.ClientError.Security.AuthorizationExpired' + ]) + }, 30000) - expect(context.workCalls).toEqual(2) - }) + it('should retry when given transaction work throws once', async () => { + await testRetryWhenTransactionWorkThrows([SERVICE_UNAVAILABLE]) + }, 30000) - async function testRetryWhenTransactionCreatorFails (errorCodes) { - const fakeSetTimeout = setTimeoutMock.install() - try { - const executor = new TransactionExecutor() - const transactionCreator = throwingTransactionCreator( - errorCodes, - new FakeTransaction() + it('should retry when given transaction work throws many times', async () => { + await testRetryWhenTransactionWorkThrows([ + SERVICE_UNAVAILABLE, + TRANSIENT_ERROR_2, + TRANSIENT_ERROR_2, + SESSION_EXPIRED, + 'Neo.ClientError.Security.AuthorizationExpired' + ]) + }, 30000) + + it('should retry when given transaction work returns rejected promise many times', async () => { + await testRetryWhenTransactionWorkReturnsRejectedPromise([ + SERVICE_UNAVAILABLE, + SERVICE_UNAVAILABLE, + TRANSIENT_ERROR_2, + SESSION_EXPIRED, + TRANSIENT_ERROR_1, + SESSION_EXPIRED, + 'Neo.ClientError.Security.AuthorizationExpired' + ]) + }, 30000) + + it('should retry when transaction commit returns rejected promise once', async () => { + await testRetryWhenTransactionCommitReturnsRejectedPromise([ + TRANSIENT_ERROR_1 + ]) + }, 30000) + + it('should retry when transaction commit returns rejected promise multiple times', async () => { + await testRetryWhenTransactionCommitReturnsRejectedPromise([ + TRANSIENT_ERROR_1, + TRANSIENT_ERROR_1, + SESSION_EXPIRED, + SERVICE_UNAVAILABLE, + TRANSIENT_ERROR_2, + 'Neo.ClientError.Security.AuthorizationExpired' + ]) + }, 30000) + + it('should retry when transaction work throws and rollback fails', async () => { + await testRetryWhenTransactionWorkThrowsAndRollbackFails( + [ + SERVICE_UNAVAILABLE, + TRANSIENT_ERROR_2, + 'Neo.ClientError.Security.AuthorizationExpired', + SESSION_EXPIRED, + SESSION_EXPIRED + ], + [SESSION_EXPIRED, TRANSIENT_ERROR_1] ) - const usedTransactions = [] + }, 30000) - const result = await executor.execute(transactionCreator, tx => { - expect(tx).toBeDefined() - usedTransactions.push(tx) - return Promise.resolve(42) - }) + it('should allow zero max retry time', () => { + const executor = new TransactionExecutor(0) + expect(executor._maxRetryTimeMs).toEqual(0) + }, 30000) - expect(usedTransactions.length).toEqual(1) - expect(result).toEqual(42) - verifyRetryDelays(fakeSetTimeout, errorCodes.length) - } finally { - fakeSetTimeout.uninstall() - } - } + it('should allow zero initial delay', () => { + const executor = new TransactionExecutor(42, 0) + expect(executor._initialRetryDelayMs).toEqual(0) + }, 30000) - async function testRetryWhenTransactionBeginFails (errorCodes) { - const fakeSetTimeout = setTimeoutMock.install() - try { + it('should disallow zero multiplier', () => { + expect(() => new TransactionExecutor(42, 42, 0)).toThrow() + }, 30000) + + it('should allow zero jitter factor', () => { + const executor = new TransactionExecutor(42, 42, 42, 0) + expect(executor._jitterFactor).toEqual(0) + }, 30000) + + it('should wrap transaction', async () => { const executor = new TransactionExecutor() - const transactionCreator = throwingTransactionCreatorOnBegin( - errorCodes, - new FakeTransaction() - ) - const usedTransactions = [] + const expectedTx = new FakeTransaction() + const modifiedTx = {} + await executor.execute(() => expectedTx, tx => { + expect(tx).toEqual(modifiedTx) + return 1 + }, tx => { + expect(tx).toEqual(ResolvedFakeTransaction.fromFakeTransaction(expectedTx)) + return modifiedTx + }) + }) - const result = await executor.execute(transactionCreator, tx => { - expect(tx).toBeDefined() - usedTransactions.push(tx) - return Promise.resolve(42) + it('should wrap transaction when re-try', async () => { + const executor = new TransactionExecutor() + const expectedTx = new FakeTransaction() + const modifiedTx = {} + const context = { workCalls: 0 } + + await executor.execute(() => expectedTx, tx => { + expect(tx).toEqual(modifiedTx) + if (context.workCalls++ < 1) { + throw newError('something on the way', 'Neo.ClientError.Security.AuthorizationExpired') + } + return 1 + }, tx => { + expect(tx).toEqual(ResolvedFakeTransaction.fromFakeTransaction(expectedTx)) + return modifiedTx }) - expect(usedTransactions.length).toEqual(1) - expect(result).toEqual(42) - verifyRetryDelays(fakeSetTimeout, errorCodes.length) - } finally { - fakeSetTimeout.uninstall() + expect(context.workCalls).toEqual(2) + }) + + async function testRetryWhenTransactionCreatorFails (errorCodes) { + const fakeSetTimeout = setTimeoutMock.install() + try { + const executor = new TransactionExecutor() + executor.pipelineBegin = pipelineBegin + const transactionCreator = throwingTransactionCreator( + errorCodes, + new FakeTransaction() + ) + const usedTransactions = [] + + const result = await executor.execute(transactionCreator, tx => { + expect(tx).toBeDefined() + usedTransactions.push(tx) + return Promise.resolve(42) + }) + + expect(usedTransactions.length).toEqual(1) + expect(result).toEqual(42) + verifyRetryDelays(fakeSetTimeout, errorCodes.length) + } finally { + fakeSetTimeout.uninstall() + } } - } - async function testRetryWhenTransactionWorkReturnsRejectedPromise ( - errorCodes - ) { - const fakeSetTimeout = setTimeoutMock.install() - try { - const executor = new TransactionExecutor() - const usedTransactions = [] - const realWork = transactionWork(errorCodes, 42) + async function testRetryWhenTransactionBeginFails (errorCodes) { + const fakeSetTimeout = setTimeoutMock.install() + try { + const executor = new TransactionExecutor() + executor.pipelineBegin = pipelineBegin + const transactionCreator = throwingTransactionCreatorOnBegin( + errorCodes, + new FakeTransaction() + ) + const usedTransactions = [] + const beginTransactions = [] + + const result = await executor.execute(transactionCreator, async tx => { + expect(tx).toBeDefined() + beginTransactions.push(tx) - const result = await executor.execute(transactionCreator(), tx => { - expect(tx).toBeDefined() - usedTransactions.push(tx) - return realWork() - }) + if (pipelineBegin) { + // forcing await for tx since pipeline doesn't wait for begin return + await tx + } - // work should have failed 'failures.length' times and succeeded 1 time - expect(usedTransactions.length).toEqual(errorCodes.length + 1) - expectAllTransactionsToBeClosed(usedTransactions) - expect(result).toEqual(42) - verifyRetryDelays(fakeSetTimeout, errorCodes.length) - } finally { - fakeSetTimeout.uninstall() + usedTransactions.push(tx) + return Promise.resolve(42) + }) + + expect(beginTransactions.length).toEqual(pipelineBegin ? errorCodes.length + 1 : 1) + expect(usedTransactions.length).toEqual(1) + expect(result).toEqual(42) + verifyRetryDelays(fakeSetTimeout, errorCodes.length) + } finally { + fakeSetTimeout.uninstall() + } } - } - async function testRetryWhenTransactionCommitReturnsRejectedPromise ( - errorCodes - ) { - const fakeSetTimeout = setTimeoutMock.install() - try { - const executor = new TransactionExecutor() - const usedTransactions = [] - const realWork = () => Promise.resolve(4242) - - const result = await executor.execute( - transactionCreator(errorCodes), - tx => { + async function testRetryWhenTransactionWorkReturnsRejectedPromise ( + errorCodes + ) { + const fakeSetTimeout = setTimeoutMock.install() + try { + const executor = new TransactionExecutor() + executor.pipelineBegin = pipelineBegin + const usedTransactions = [] + const realWork = transactionWork(errorCodes, 42) + + const result = await executor.execute(transactionCreator(), tx => { expect(tx).toBeDefined() usedTransactions.push(tx) return realWork() - } - ) - - // work should have failed 'failures.length' times and succeeded 1 time - expect(usedTransactions.length).toEqual(errorCodes.length + 1) - expectAllTransactionsToBeClosed(usedTransactions) - expect(result).toEqual(4242) - verifyRetryDelays(fakeSetTimeout, errorCodes.length) - } finally { - fakeSetTimeout.uninstall() + }) + + // work should have failed 'failures.length' times and succeeded 1 time + expect(usedTransactions.length).toEqual(errorCodes.length + 1) + expectAllTransactionsToBeClosed(usedTransactions) + expect(result).toEqual(42) + verifyRetryDelays(fakeSetTimeout, errorCodes.length) + } finally { + fakeSetTimeout.uninstall() + } } - } - async function testRetryWhenTransactionWorkThrows (errorCodes) { - const fakeSetTimeout = setTimeoutMock.install() - try { - const executor = new TransactionExecutor() - const usedTransactions = [] - const realWork = throwingTransactionWork(errorCodes, 42) - - const result = await executor.execute(transactionCreator(), tx => { - expect(tx).toBeDefined() - usedTransactions.push(tx) - return realWork() - }) - - // work should have failed 'failures.length' times and succeeded 1 time - expect(usedTransactions.length).toEqual(errorCodes.length + 1) - expectAllTransactionsToBeClosed(usedTransactions) - expect(result).toEqual(42) - verifyRetryDelays(fakeSetTimeout, errorCodes.length) - } finally { - fakeSetTimeout.uninstall() + async function testRetryWhenTransactionCommitReturnsRejectedPromise ( + errorCodes + ) { + const fakeSetTimeout = setTimeoutMock.install() + try { + const executor = new TransactionExecutor() + executor.pipelineBegin = pipelineBegin + const usedTransactions = [] + const realWork = () => Promise.resolve(4242) + + const result = await executor.execute( + transactionCreator(errorCodes), + tx => { + expect(tx).toBeDefined() + usedTransactions.push(tx) + return realWork() + } + ) + + // work should have failed 'failures.length' times and succeeded 1 time + expect(usedTransactions.length).toEqual(errorCodes.length + 1) + expectAllTransactionsToBeClosed(usedTransactions) + expect(result).toEqual(4242) + verifyRetryDelays(fakeSetTimeout, errorCodes.length) + } finally { + fakeSetTimeout.uninstall() + } } - } - async function testRetryWhenTransactionWorkThrowsAndRollbackFails ( - txWorkErrorCodes, - rollbackErrorCodes - ) { - const fakeSetTimeout = setTimeoutMock.install() - try { - const executor = new TransactionExecutor() - const usedTransactions = [] - const realWork = throwingTransactionWork(txWorkErrorCodes, 424242) + async function testRetryWhenTransactionWorkThrows (errorCodes) { + const fakeSetTimeout = setTimeoutMock.install() + try { + const executor = new TransactionExecutor() + executor.pipelineBegin = pipelineBegin + const usedTransactions = [] + const realWork = throwingTransactionWork(errorCodes, 42) - const result = await executor.execute( - transactionCreator([], rollbackErrorCodes), - tx => { + const result = await executor.execute(transactionCreator(), async tx => { expect(tx).toBeDefined() usedTransactions.push(tx) + if (pipelineBegin) { + await tx + } return realWork() - } - ) + }) + + // work should have failed 'failures.length' times and succeeded 1 time + expect(usedTransactions.length).toEqual(errorCodes.length + 1) + expectAllTransactionsToBeClosed(usedTransactions) + expect(result).toEqual(42) + verifyRetryDelays(fakeSetTimeout, errorCodes.length) + } finally { + fakeSetTimeout.uninstall() + } + } - // work should have failed 'failures.length' times and succeeded 1 time - expect(usedTransactions.length).toEqual(txWorkErrorCodes.length + 1) - expectAllTransactionsToBeClosed(usedTransactions) - expect(result).toEqual(424242) - verifyRetryDelays(fakeSetTimeout, txWorkErrorCodes.length) - } finally { - fakeSetTimeout.uninstall() + async function testRetryWhenTransactionWorkThrowsAndRollbackFails ( + txWorkErrorCodes, + rollbackErrorCodes + ) { + const fakeSetTimeout = setTimeoutMock.install() + try { + const executor = new TransactionExecutor() + executor.pipelineBegin = pipelineBegin + const usedTransactions = [] + const realWork = throwingTransactionWork(txWorkErrorCodes, 424242) + + const result = await executor.execute( + transactionCreator([], rollbackErrorCodes), + tx => { + expect(tx).toBeDefined() + usedTransactions.push(tx) + return realWork() + } + ) + + // work should have failed 'failures.length' times and succeeded 1 time + expect(usedTransactions.length).toEqual(txWorkErrorCodes.length + 1) + expectAllTransactionsToBeClosed(usedTransactions) + expect(result).toEqual(424242) + verifyRetryDelays(fakeSetTimeout, txWorkErrorCodes.length) + } finally { + fakeSetTimeout.uninstall() + } } - } + }) }) async function testNoRetryOnUnknownError ( @@ -526,11 +546,7 @@ function throwingTransactionCreator (errorCodes, result) { function throwingTransactionCreatorOnBegin (errorCodes, result) { const remainingErrorCodes = errorCodes.slice().reverse() return () => { - if (remainingErrorCodes.length === 0) { - return Promise.resolve(result) - } - const errorCode = remainingErrorCodes.pop() - return Promise.reject(error(errorCode)) + return new FakeTransaction(undefined, undefined, remainingErrorCodes.pop()) } } @@ -581,12 +597,58 @@ function expectAllTransactionsToBeClosed (transactions) { } class FakeTransaction { - constructor (commitErrorCode, rollbackErrorCode) { + constructor (commitErrorCode, rollbackErrorCode, beginErrorCode) { this._commitErrorCode = commitErrorCode this._rollbackErrorCode = rollbackErrorCode + this._beginErrorCode = beginErrorCode this._open = true } + then (onfulfilled, onrejected) { + if (this._beginErrorCode) { + return Promise.reject(error(this._beginErrorCode)).catch(onrejected) + } + return Promise.resolve(ResolvedFakeTransaction.fromFakeTransaction(this)).then(onfulfilled) + } + + catch (onRejected) { + return this.then(null, onRejected) + } + + finally (onfinally) { + return this.then().finally(onfinally) + } + + isOpen () { + return this._open + } + + commit () { + this._open = false + if (this._commitErrorCode) { + return Promise.reject(error(this._commitErrorCode)) + } + return Promise.resolve() + } + + rollback () { + this._open = false + if (this._rollbackErrorCode) { + return Promise.reject(error(this._rollbackErrorCode)) + } + return Promise.resolve() + } +} + +class ResolvedFakeTransaction { + static fromFakeTransaction (fake) { + const tx = new ResolvedFakeTransaction() + tx._commitErrorCode = fake._commitErrorCode + tx._rollbackErrorCode = fake._rollbackErrorCode + tx._open = fake._open + return tx + } + isOpen () { return this._open } diff --git a/packages/testkit-backend/src/feature/common.js b/packages/testkit-backend/src/feature/common.js index 5240ab630..a1505376c 100644 --- a/packages/testkit-backend/src/feature/common.js +++ b/packages/testkit-backend/src/feature/common.js @@ -32,6 +32,7 @@ const features = [ 'Feature:API:Session:NotificationsConfig', 'Optimization:AuthPipelining', 'Optimization:EagerTransactionBegin', + 'Optimization:ExecuteQueryPipelining', 'Optimization:ImplicitDefaultArguments', 'Optimization:MinimalBookmarksSet', 'Optimization:MinimalResets',