From ff79063219bd7abd8773f36c552f29bdd1eea993 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 4 Apr 2022 16:54:33 +0200 Subject: [PATCH 1/8] Deprecate `Session.(read|write)Transaction` in favor for `execute(Read|Write)" methods The new methods provides a `ManagedTransaction` objects to the transaction functions. These transaction objects don't have `commit`, `rollback` and `close` capabilities exposed in the API. --- packages/core/src/index.ts | 3 + .../core/src/internal/transaction-executor.ts | 43 ++++++----- packages/core/src/session.ts | 72 +++++++++++++++++++ packages/core/src/transaction-managed.ts | 72 +++++++++++++++++++ packages/core/src/transaction-promise.ts | 2 + packages/core/test/session.test.ts | 54 ++++++++++++++ packages/neo4j-driver-lite/src/index.ts | 5 +- packages/neo4j-driver/src/index.js | 2 +- .../test/internal/shared-neo4j.js | 1 - .../internal/transaction-executor.test.js | 35 ++++++++- packages/neo4j-driver/types/index.d.ts | 5 +- .../testkit-backend/src/request-handlers.js | 4 +- 12 files changed, 274 insertions(+), 24 deletions(-) create mode 100644 packages/core/src/transaction-managed.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index bc7b4cd86..6f05d54b1 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -70,6 +70,7 @@ import Result, { QueryResult, ResultObserver } from './result' import ConnectionProvider from './connection-provider' import Connection from './connection' import Transaction from './transaction' +import ManagedTransaction from './transaction-managed' import TransactionPromise from './transaction-promise' import Session, { TransactionConfig } from './session' import Driver, * as driver from './driver' @@ -137,6 +138,7 @@ const forExport = { Stats, Result, Transaction, + ManagedTransaction, TransactionPromise, Session, Driver, @@ -196,6 +198,7 @@ export { ConnectionProvider, Connection, Transaction, + ManagedTransaction, TransactionPromise, Session, Driver, diff --git a/packages/core/src/internal/transaction-executor.ts b/packages/core/src/internal/transaction-executor.ts index 0b849ddd2..f2218e450 100644 --- a/packages/core/src/internal/transaction-executor.ts +++ b/packages/core/src/internal/transaction-executor.ts @@ -28,7 +28,7 @@ const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0 const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2 type TransactionCreator = () => TransactionPromise -type TransactionWork = (tx: Transaction) => T | Promise +type TransactionWork = (tx: Tx) => T | Promise type Resolve = (value: T | PromiseLike) => void type Reject = (value: any) => void type Timeout = ReturnType @@ -68,16 +68,18 @@ export class TransactionExecutor { this._verifyAfterConstruction() } - execute( + execute( transactionCreator: TransactionCreator, - transactionWork: TransactionWork + transactionWork: TransactionWork, + transactionWrapper?: (tx: Transaction) => Tx ): Promise { return new Promise((resolve, reject) => { this._executeTransactionInsidePromise( transactionCreator, transactionWork, resolve, - reject + reject, + transactionWrapper ) }).catch(error => { const retryStartTimeMs = Date.now() @@ -87,7 +89,8 @@ export class TransactionExecutor { transactionWork, error, retryStartTimeMs, - retryDelayMs + retryDelayMs, + transactionWrapper ) }) } @@ -98,12 +101,13 @@ export class TransactionExecutor { this._inFlightTimeoutIds = [] } - _retryTransactionPromise( + _retryTransactionPromise( transactionCreator: TransactionCreator, - transactionWork: TransactionWork, + transactionWork: TransactionWork, error: Error, retryStartTime: number, - retryDelayMs: number + retryDelayMs: number, + transactionWrapper?: (tx: Transaction) => Tx ): Promise { const elapsedTimeMs = Date.now() - retryStartTime @@ -122,7 +126,8 @@ export class TransactionExecutor { transactionCreator, transactionWork, resolve, - reject + reject, + transactionWrapper ) }, nextRetryTime) // add newly created timeoutId to the list of all in-flight timeouts @@ -134,16 +139,18 @@ export class TransactionExecutor { transactionWork, error, retryStartTime, - nextRetryDelayMs + nextRetryDelayMs, + transactionWrapper ) }) } - async _executeTransactionInsidePromise( + async _executeTransactionInsidePromise( transactionCreator: TransactionCreator, - transactionWork: TransactionWork, + transactionWork: TransactionWork, resolve: Resolve, - reject: Reject + reject: Reject, + transactionWrapper?: (tx: Transaction) => Tx, ): Promise { let tx: Transaction try { @@ -154,7 +161,9 @@ export class TransactionExecutor { return } - const resultPromise = this._safeExecuteTransactionWork(tx, transactionWork) + const wrap = transactionWrapper || ((tx: Transaction) => tx) + const wrappedTx = wrap(tx) + const resultPromise = this._safeExecuteTransactionWork(wrappedTx, transactionWork) resultPromise .then(result => @@ -163,9 +172,9 @@ export class TransactionExecutor { .catch(error => this._handleTransactionWorkFailure(error, tx, reject)) } - _safeExecuteTransactionWork( - tx: Transaction, - transactionWork: TransactionWork + _safeExecuteTransactionWork( + tx: Tx, + transactionWork: TransactionWork ): Promise { try { const result = transactionWork(tx) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index bd7ad4424..c09e87b5a 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -32,9 +32,11 @@ import { Query, SessionMode } from './types' import Connection from './connection' import { NumberOrInteger } from './graph-types' import TransactionPromise from './transaction-promise' +import ManagedTransaction from './transaction-managed' type ConnectionConsumer = (connection: Connection | void) => any | undefined type TransactionWork = (tx: Transaction) => Promise | T +type ManagedTransactionWork = (tx: ManagedTransaction) => Promise | T interface TransactionConfig { timeout?: NumberOrInteger @@ -336,6 +338,8 @@ class Session { * delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's * `maxTransactionRetryTime` property in milliseconds. * + * @deprecated This method will be removed in version 6.0. Please, use {@link Session#executeRead} instead. + * * @param {function(tx: Transaction): Promise} transactionWork - Callback that executes operations against * a given {@link Transaction}. * @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work. @@ -358,6 +362,8 @@ class Session { * delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's * `maxTransactionRetryTime` property in milliseconds. * + * @deprecated This method will be removed in version 6.0. Please, use {@link Session#executeWrite} instead. + * * @param {function(tx: Transaction): Promise} transactionWork - Callback that executes operations against * a given {@link Transaction}. * @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work. @@ -383,6 +389,72 @@ class Session { ) } + /** + * Execute given unit of work in a {@link READ} transaction. + * + * Transaction will automatically be committed unless the given function throws or returns a rejected promise. + * Some failures of the given function or the commit itself will be retried with exponential backoff with initial + * delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's + * `maxTransactionRetryTime` property in milliseconds. + * + * @param {function(tx: ManagedTransaction): Promise} transactionWork - Callback that executes operations against + * a given {@link Transaction}. + * @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work. + * @return {Promise} Resolved promise as returned by the given function or rejected promise when given + * function or commit fails. + */ + executeRead( + transactionWork: ManagedTransactionWork, + transactionConfig?: TransactionConfig + ): Promise { + const config = new TxConfig(transactionConfig) + return this._executeInTransaction(ACCESS_MODE_READ, config, transactionWork) + } + + /** + * Execute given unit of work in a {@link WRITE} transaction. + * + * Transaction will automatically be committed unless the given function throws or returns a rejected promise. + * Some failures of the given function or the commit itself will be retried with exponential backoff with initial + * delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's + * `maxTransactionRetryTime` property in milliseconds. + * + * @param {function(tx: ManagedTransaction): Promise} transactionWork - Callback that executes operations against + * a given {@link Transaction}. + * @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work. + * @return {Promise} Resolved promise as returned by the given function or rejected promise when given + * function or commit fails. + */ + executeWrite( + transactionWork: ManagedTransactionWork, + transactionConfig?: TransactionConfig + ): Promise { + const config = new TxConfig(transactionConfig) + return this._executeInTransaction(ACCESS_MODE_WRITE, config, transactionWork) + } + + /** + * @private + * @param {SessionMode} accessMode + * @param {TxConfig} transactionConfig + * @param {ManagedTransactionWork} transactionWork + * @returns {Promise} + */ + private _executeInTransaction( + accessMode: SessionMode, + transactionConfig: TxConfig, + transactionWork: ManagedTransactionWork + ): Promise { + return this._transactionExecutor.execute( + () => this._beginTransaction(accessMode, transactionConfig), + transactionWork, + tx => new ManagedTransaction({ + isOpen: tx.isOpen.bind(tx), + run: tx.run.bind(tx), + }) + ) + } + /** * Sets the resolved database name in the session context. * @private diff --git a/packages/core/src/transaction-managed.ts b/packages/core/src/transaction-managed.ts new file mode 100644 index 000000000..edc2c2be6 --- /dev/null +++ b/packages/core/src/transaction-managed.ts @@ -0,0 +1,72 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Result from './result' +import { Query } from './types' + +interface Run { + (query: Query, parameters?: any): Result +} + +interface IsOpen { + (): boolean +} + +/** + * Represents a transaction that is managed by the transaction executor. + * + * @public + */ +class ManagedTransaction { + private _run: Run + private _isOpen: IsOpen + + constructor({ run, isOpen }: { run: Run, isOpen: IsOpen }) { + /** + * @private + */ + this._run = run + /** + * @private + */ + this._isOpen = isOpen + } + + /** + * Run Cypher query + * Could be called with a query object i.e.: `{text: "MATCH ...", parameters: {param: 1}}` + * or with the query and parameters as separate arguments. + * @param {mixed} query - Cypher query to execute + * @param {Object} parameters - Map with parameters to use in query + * @return {Result} New Result + */ + run(query: Query, parameters?: any): Result { + return this._run(query, parameters) + } + + /** + * Check if this transaction is active, which means commit and rollback did not happen. + * @return {boolean} `true` when not committed and not rolled back, `false` otherwise. + */ + isOpen(): boolean { + return this._isOpen() + } +} + +export default ManagedTransaction diff --git a/packages/core/src/transaction-promise.ts b/packages/core/src/transaction-promise.ts index 1d40fdab4..5a695d2ea 100644 --- a/packages/core/src/transaction-promise.ts +++ b/packages/core/src/transaction-promise.ts @@ -170,6 +170,7 @@ class TransactionPromise extends Transaction implements Promise{ /** * @access private + * @returns {void} */ private _onBeginError(error: Error): void { this._beginError = error; @@ -180,6 +181,7 @@ class TransactionPromise extends Transaction implements Promise{ /** * @access private + * @returns {void} */ private _onBeginMetadata(metadata: any): void { this._beginMetadata = metadata || {}; diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index 3976dfee1..0a27d8af4 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -19,6 +19,7 @@ import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction } from '../src' import { bookmarks } from '../src/internal' import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants' +import ManagedTransaction from '../src/transaction-managed' import FakeConnection from './utils/connection.fake' describe('session', () => { @@ -279,6 +280,59 @@ describe('session', () => { expect(tx).toBeDefined() }) }) + + describe.each([ + ['.executeWrite()', (session: Session) => session.executeWrite.bind(session)], + ['.executeRead()', (session: Session) => session.executeRead.bind(session)], + ])('%s', (_, execute) => { + it('should call executor with ManagedTransaction', async () => { + const connection = mockBeginWithSuccess(newFakeConnection()) + const session = newSessionWithConnection(connection, false, 1000) + const status = { functionCalled: false } + + await execute(session)(async (tx: ManagedTransaction) => { + expect(typeof tx).toEqual('object') + expect(tx).toBeInstanceOf(ManagedTransaction) + + status.functionCalled = true + }) + + expect(status.functionCalled).toEqual(true) + }) + + it('should proxy run to the begined transaction', async () => { + const connection = mockBeginWithSuccess(newFakeConnection()) + const session = newSessionWithConnection(connection, false, FETCH_ALL) + // @ts-ignore + const run = jest.spyOn(Transaction.prototype, 'run').mockImplementation(() => Promise.resolve()) + const status = { functionCalled: false } + const query = 'RETURN $a' + const params = { a: 1 } + + await execute(session)(async (tx: ManagedTransaction) => { + status.functionCalled = true + await tx.run(query, params) + }) + + expect(status.functionCalled).toEqual(true) + expect(run).toHaveBeenCalledWith(query, params) + }) + + it('should proxy isOpen to the begined transaction', async () => { + const connection = mockBeginWithSuccess(newFakeConnection()) + const session = newSessionWithConnection(connection, false, FETCH_ALL) + const isOpen = jest.spyOn(Transaction.prototype, 'isOpen').mockImplementationOnce(() => true) + const status = { functionCalled: false } + + await execute(session)(async (tx: ManagedTransaction) => { + status.functionCalled = true + expect(tx.isOpen()).toBe(true) + }) + + expect(status.functionCalled).toEqual(true) + expect(isOpen).toHaveBeenCalled() + }) + }) }) function mockBeginWithSuccess(connection: FakeConnection) { diff --git a/packages/neo4j-driver-lite/src/index.ts b/packages/neo4j-driver-lite/src/index.ts index d4578ea90..34adfab78 100644 --- a/packages/neo4j-driver-lite/src/index.ts +++ b/packages/neo4j-driver-lite/src/index.ts @@ -63,6 +63,7 @@ import { NotificationPosition, Session, Transaction, + ManagedTransaction, TransactionPromise, ServerInfo, Connection, @@ -151,7 +152,7 @@ const { * connectionAcquisitionTimeout: 60000, // 1 minute * * // Specify the maximum time in milliseconds transactions are allowed to retry via - * // `Session#readTransaction()` and `Session#writeTransaction()` functions. + * // `Session#executeRead()` and `Session#executeWrite()` functions. * // These functions will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient * // errors with exponential backoff using initial delay of 1 second. * // Default value is 30000 which is 30 seconds. @@ -428,6 +429,7 @@ const forExport = { ServerInfo, Session, Transaction, + ManagedTransaction, TransactionPromise, Point, Duration, @@ -478,6 +480,7 @@ export { ServerInfo, Session, Transaction, + ManagedTransaction, TransactionPromise, Point, Duration, diff --git a/packages/neo4j-driver/src/index.js b/packages/neo4j-driver/src/index.js index f8bb7f809..d63fd87cc 100644 --- a/packages/neo4j-driver/src/index.js +++ b/packages/neo4j-driver/src/index.js @@ -126,7 +126,7 @@ const { * connectionAcquisitionTimeout: 60000, // 1 minute * * // Specify the maximum time in milliseconds transactions are allowed to retry via - * // `Session#readTransaction()` and `Session#writeTransaction()` functions. + * // `Session#executeRead()` and `Session#executeWrite()` functions. * // These functions will retry the given unit of work on `ServiceUnavailable`, `SessionExpired` and transient * // errors with exponential backoff using initial delay of 1 second. * // Default value is 30000 which is 30 seconds. diff --git a/packages/neo4j-driver/test/internal/shared-neo4j.js b/packages/neo4j-driver/test/internal/shared-neo4j.js index d1ce6fd2a..441844f3c 100644 --- a/packages/neo4j-driver/test/internal/shared-neo4j.js +++ b/packages/neo4j-driver/test/internal/shared-neo4j.js @@ -16,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - import neo4j from '../../src' import { json } from 'neo4j-driver-core' diff --git a/packages/neo4j-driver/test/internal/transaction-executor.test.js b/packages/neo4j-driver/test/internal/transaction-executor.test.js index 021155982..42fbd8005 100644 --- a/packages/neo4j-driver/test/internal/transaction-executor.test.js +++ b/packages/neo4j-driver/test/internal/transaction-executor.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import { newError, error as err, internal } from 'neo4j-driver-core' +import { newError, error as err, internal, int } from 'neo4j-driver-core' import { setTimeoutMock } from './timers-util' import lolex from 'lolex' @@ -286,6 +286,39 @@ describe('#unit TransactionExecutor', () => { expect(executor._jitterFactor).toEqual(0) }, 30000) + it('should wrap transaction', async () => { + const executor = new TransactionExecutor() + const expectedTx = new FakeTransaction() + const modifiedTx = {} + await executor.execute(() => expectedTx, tx => { + expect(tx).toEqual(modifiedTx) + return 1 + }, tx => { + expect(tx).toEqual(expectedTx) + return modifiedTx + }) + }) + + it('should wrap transaction when re-try', async () => { + const executor = new TransactionExecutor() + const expectedTx = new FakeTransaction() + const modifiedTx = {} + const context = { workCalls: 0 } + + await executor.execute(() => expectedTx, tx => { + expect(tx).toEqual(modifiedTx) + if (context.workCalls++ < 1) { + throw newError('something on the way', 'Neo.ClientError.Security.AuthorizationExpired') + } + return 1 + }, tx => { + expect(tx).toEqual(expectedTx) + return modifiedTx + }) + + expect(context.workCalls).toEqual(2) + }) + async function testRetryWhenTransactionCreatorFails (errorCodes) { const fakeSetTimeout = setTimeoutMock.install() try { diff --git a/packages/neo4j-driver/types/index.d.ts b/packages/neo4j-driver/types/index.d.ts index 7e5e0447b..23f236748 100644 --- a/packages/neo4j-driver/types/index.d.ts +++ b/packages/neo4j-driver/types/index.d.ts @@ -58,6 +58,7 @@ import { ResultObserver, QueryResult, Transaction, + ManagedTransaction, Session, ConnectionProvider } from 'neo4j-driver-core' @@ -190,7 +191,8 @@ declare const forExport: { ServerInfo: ServerInfo NotificationPosition: NotificationPosition Session: Session - Transaction: Transaction + Transaction: Transaction, + ManagedTransaction: ManagedTransaction, Point: Point isPoint: typeof isPoint Duration: Duration @@ -249,6 +251,7 @@ export { NotificationPosition, Session, Transaction, + ManagedTransaction, Point, isPoint, Duration, diff --git a/packages/testkit-backend/src/request-handlers.js b/packages/testkit-backend/src/request-handlers.js index 4f30c3db2..217f7f4d8 100644 --- a/packages/testkit-backend/src/request-handlers.js +++ b/packages/testkit-backend/src/request-handlers.js @@ -225,7 +225,7 @@ export function SessionReadTransaction (context, data, wire) { const { sessionId, txMeta: metadata } = data const session = context.getSession(sessionId) return session - .readTransaction( + .executeRead( tx => new Promise((resolve, reject) => { const id = context.addTx(tx, sessionId, resolve, reject) @@ -323,7 +323,7 @@ export function SessionWriteTransaction (context, data, wire) { const { sessionId, txMeta: metadata } = data const session = context.getSession(sessionId) return session - .writeTransaction( + .executeWrite( tx => new Promise((resolve, reject) => { const id = context.addTx(tx, sessionId, resolve, reject) From 4aff4bee67324b8d2527abd6ba1230d554cc199f Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 4 Apr 2022 17:49:05 +0200 Subject: [PATCH 2/8] Fix type-checking --- packages/core/src/internal/transaction-executor.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/core/src/internal/transaction-executor.ts b/packages/core/src/internal/transaction-executor.ts index f2218e450..8051d764b 100644 --- a/packages/core/src/internal/transaction-executor.ts +++ b/packages/core/src/internal/transaction-executor.ts @@ -161,7 +161,10 @@ export class TransactionExecutor { return } - const wrap = transactionWrapper || ((tx: Transaction) => tx) + // The conversion from `tx` as `unknown` then to `Tx` is necessary + // because it is not possible to be sure that `Tx` is a subtype of `Transaction` + // in using static type checking. + const wrap = transactionWrapper || ((tx: Transaction) => tx as unknown as Tx) const wrappedTx = wrap(tx) const resultPromise = this._safeExecuteTransactionWork(wrappedTx, transactionWork) From 34080c022fe3e6a49eb715d0eba218e7986ae032 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 4 Apr 2022 18:31:35 +0200 Subject: [PATCH 3/8] Add executeRead and executeWrite to RxSession --- packages/neo4j-driver/src/session-rx.js | 45 +++++- .../src/transaction-managed-rx.js | 50 +++++++ packages/neo4j-driver/src/transaction-rx.js | 9 +- packages/neo4j-driver/test/rx/session.test.js | 132 ++++++++++++++++++ .../test/types/session-rx.test.ts | 25 ++++ .../test/types/transaction-managed-rx.test.ts | 64 +++++++++ .../test/types/transaction-rx.test.ts | 2 + packages/neo4j-driver/types/index.d.ts | 4 + packages/neo4j-driver/types/session-rx.d.ts | 17 ++- .../types/transaction-managed-rx.d.ts | 28 ++++ .../neo4j-driver/types/transaction-rx.d.ts | 2 + .../src/request-handlers-rx.js | 4 +- 12 files changed, 376 insertions(+), 6 deletions(-) create mode 100644 packages/neo4j-driver/src/transaction-managed-rx.js create mode 100644 packages/neo4j-driver/test/types/transaction-managed-rx.test.ts create mode 100644 packages/neo4j-driver/types/transaction-managed-rx.d.ts diff --git a/packages/neo4j-driver/src/session-rx.js b/packages/neo4j-driver/src/session-rx.js index 863d451d3..5660814dd 100644 --- a/packages/neo4j-driver/src/session-rx.js +++ b/packages/neo4j-driver/src/session-rx.js @@ -84,6 +84,7 @@ export default class RxSession { * Executes the provided unit of work in a {@link READ} reactive transaction which is created with the provided * transaction configuration. * @public + * @deprecated This method will be removed in version 6.0. Please, use {@link RxSession#executeRead} instead. * @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed. * @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver. * @returns {Observable} - A reactive stream returned by the unit of work. @@ -96,6 +97,7 @@ export default class RxSession { * Executes the provided unit of work in a {@link WRITE} reactive transaction which is created with the provided * transaction configuration. * @public + * @deprecated This method will be removed in version 6.0. Please, use {@link RxSession#executeWrite} instead. * @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed. * @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver. * @returns {Observable} - A reactive stream returned by the unit of work. @@ -104,6 +106,45 @@ export default class RxSession { return this._runTransaction(ACCESS_MODE_WRITE, work, transactionConfig) } + + /** + * Executes the provided unit of work in a {@link READ} reactive transaction which is created with the provided + * transaction configuration. + * @public + * @param {function(txc: RxManagedTransaction): Observable} work - A unit of work to be executed. + * @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver. + * @returns {Observable} - A reactive stream returned by the unit of work. + */ + executeRead (work, transactionConfig) { + return this._executeInTransaction(ACCESS_MODE_READ, work, transactionConfig) + } + + /** + * Executes the provided unit of work in a {@link WRITE} reactive transaction which is created with the provided + * transaction configuration. + * @public + * @param {function(txc: RxManagedTransaction): Observable} work - A unit of work to be executed. + * @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver. + * @returns {Observable} - A reactive stream returned by the unit of work. + */ + executeWrite (work, transactionConfig) { + return this._executeInTransaction(ACCESS_MODE_WRITE, work, transactionConfig) + } + + /** + * @private + * @param {function(txc: RxManagedTransaction): Observable} work + * @param {TransactionConfig} transactionConfig + * @returns {Observable} + */ + _executeInTransaction (accessMode, work, transactionConfig) { + const wrapper = txc => new RxManagedTransaction({ + run: txc.run.bind(txc), + isOpen: txc.isOpen.bind(txc) + }) + return this._runTransaction(accessMode, work, transactionConfig, wrapper) + } + /** * Closes this reactive session. * @@ -181,7 +222,7 @@ export default class RxSession { /** * @private */ - _runTransaction (accessMode, work, transactionConfig) { + _runTransaction (accessMode, work, transactionConfig, transactionWrapper = (tx) => tx) { let txConfig = TxConfig.empty() if (transactionConfig) { txConfig = new TxConfig(transactionConfig) @@ -192,7 +233,7 @@ export default class RxSession { flatMap(txc => defer(() => { try { - return work(txc) + return work(transactionWrapper(txc)) } catch (err) { return throwError(err) } diff --git a/packages/neo4j-driver/src/transaction-managed-rx.js b/packages/neo4j-driver/src/transaction-managed-rx.js new file mode 100644 index 000000000..bef42078f --- /dev/null +++ b/packages/neo4j-driver/src/transaction-managed-rx.js @@ -0,0 +1,50 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Represents a rx transaction that is managed by the transaction executor. + * + * @public + */ +class RxManagedTransaction { + constructor({ run, isOpen }) { + this._run = run + this._isOpen = isOpen + } + + /** + * Creates a reactive result that will execute the query in this transaction, with the provided parameters. + * + * @public + * @param {string} query - Query to be executed. + * @param {Object} parameters - Parameter values to use in query execution. + * @returns {RxResult} - A reactive result + */ + run (query, parameters) { + return this._run(query, parameters) + } + + /** + * Check if this transaction is active, which means commit and rollback did not happen. + * @return {boolean} `true` when not committed and not rolled back, `false` otherwise. + */ + isOpen() { + return this._isOpen() + } +} diff --git a/packages/neo4j-driver/src/transaction-rx.js b/packages/neo4j-driver/src/transaction-rx.js index 11acac062..e41621792 100644 --- a/packages/neo4j-driver/src/transaction-rx.js +++ b/packages/neo4j-driver/src/transaction-rx.js @@ -41,7 +41,6 @@ export default class RxTransaction { * @param {Object} parameters - Parameter values to use in query execution. * @returns {RxResult} - A reactive result */ - run (query, parameters) { return new RxResult( new Observable(observer => { @@ -91,6 +90,14 @@ export default class RxTransaction { }) } + /** + * Check if this transaction is active, which means commit and rollback did not happen. + * @return {boolean} `true` when not committed and not rolled back, `false` otherwise. + */ + isOpen() { + return this._txc.isOpen() + } + /** * Closes the transaction * diff --git a/packages/neo4j-driver/test/rx/session.test.js b/packages/neo4j-driver/test/rx/session.test.js index 089d6099b..ffed49b9f 100644 --- a/packages/neo4j-driver/test/rx/session.test.js +++ b/packages/neo4j-driver/test/rx/session.test.js @@ -239,6 +239,138 @@ describe('#integration rx-session', () => { expect(await countNodes('Person')).toBe(0) }, 60000) + describe('.executeWrite()', () => { + it('should run transactions without retries', async () => { + if (protocolVersion < 4.0) { + return + } + + const txcWork = new ConfigurableTransactionWork({ + query: 'CREATE (:WithoutRetry) RETURN 5' + }) + + const result = await session + .executeWrite(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + .toPromise() + expect(result).toEqual([ + Notification.createNext(5), + Notification.createComplete() + ]) + + expect(txcWork.invocations).toBe(1) + expect(await countNodes('WithoutRetry')).toBe(1) + }, 60000) + + + it('should run transaction with retries on reactive failures', async () => { + if (protocolVersion < 4.0) { + return + } + + const txcWork = new ConfigurableTransactionWork({ + query: 'CREATE (:WithReactiveFailure) RETURN 7', + reactiveFailures: [ + newError('service unavailable', SERVICE_UNAVAILABLE), + newError('session expired', SESSION_EXPIRED), + newError('transient error', 'Neo.TransientError.Transaction.NotStarted') + ] + }) + + const result = await session + .executeWrite(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + .toPromise() + expect(result).toEqual([ + Notification.createNext(7), + Notification.createComplete() + ]) + + expect(txcWork.invocations).toBe(4) + expect(await countNodes('WithReactiveFailure')).toBe(1) + }, 60000) + + it('should run transaction with retries on synchronous failures', async () => { + if (protocolVersion < 4.0) { + return + } + + const txcWork = new ConfigurableTransactionWork({ + query: 'CREATE (:WithSyncFailure) RETURN 9', + syncFailures: [ + newError('service unavailable', SERVICE_UNAVAILABLE), + newError('session expired', SESSION_EXPIRED), + newError('transient error', 'Neo.TransientError.Transaction.NotStarted') + ] + }) + + const result = await session + .executeWrite(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + .toPromise() + expect(result).toEqual([ + Notification.createNext(9), + Notification.createComplete() + ]) + + expect(txcWork.invocations).toBe(4) + expect(await countNodes('WithSyncFailure')).toBe(1) + }, 60000) + + it('should fail on transactions that cannot be retried', async () => { + if (protocolVersion < 4.0) { + return + } + + const txcWork = new ConfigurableTransactionWork({ + query: 'UNWIND [10, 5, 0] AS x CREATE (:Hi) RETURN 10/x' + }) + + const result = await session + .executeWrite(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + .toPromise() + expect(result).toEqual([ + Notification.createNext(1), + Notification.createNext(2), + Notification.createError(jasmine.stringMatching(/\/ by zero/)) + ]) + + expect(txcWork.invocations).toBe(1) + expect(await countNodes('Hi')).toBe(0) + }, 60000) + + it('should fail even after a transient error', async () => { + if (protocolVersion < 4.0) { + return + } + + const txcWork = new ConfigurableTransactionWork({ + query: 'CREATE (:Person) RETURN 1', + syncFailures: [ + newError( + 'a transient error', + 'Neo.TransientError.Transaction.NotStarted' + ) + ], + reactiveFailures: [ + newError('a database error', 'Neo.Database.Not.Started') + ] + }) + + const result = await session + .executeWrite(txc => txcWork.work(txc)) + .pipe(materialize(), toArray()) + .toPromise() + expect(result).toEqual([ + Notification.createError(jasmine.stringMatching(/a database error/)) + ]) + + expect(txcWork.invocations).toBe(2) + expect(await countNodes('Person')).toBe(0) + }, 60000) + }) + async function countNodes (label) { const session = driver.rxSession() return await session diff --git a/packages/neo4j-driver/test/types/session-rx.test.ts b/packages/neo4j-driver/test/types/session-rx.test.ts index 4c768cac9..9c3e1ceed 100644 --- a/packages/neo4j-driver/test/types/session-rx.test.ts +++ b/packages/neo4j-driver/test/types/session-rx.test.ts @@ -19,6 +19,7 @@ import RxSession from '../../types/session-rx' import RxTransaction from '../../types/transaction-rx' +import { RxManagedTransaction } from '../../types' import RxResult from '../../types/result-rx' import { Integer, @@ -152,3 +153,27 @@ const observable6: Observable = rxSession.writeTransaction( (tx: RxTransaction) => of(42), txConfig4 ) + +const observable7: Observable = rxSession.executeRead( + (tx: RxManagedTransaction) => { + return of(10) + } +) + +const observable8: Observable = rxSession.executeRead( + (tx: RxManagedTransaction) => { + return of('42') + } +) + +const observable9: Observable = rxSession.executeWrite( + (tx: RxManagedTransaction) => { + return of(10) + } +) + +const observable10: Observable = rxSession.executeWrite( + (tx: RxManagedTransaction) => { + return of('42') + } +) diff --git a/packages/neo4j-driver/test/types/transaction-managed-rx.test.ts b/packages/neo4j-driver/test/types/transaction-managed-rx.test.ts new file mode 100644 index 000000000..0065842e9 --- /dev/null +++ b/packages/neo4j-driver/test/types/transaction-managed-rx.test.ts @@ -0,0 +1,64 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import RxManagedTransaction from '../../types/transaction-managed-rx' +import { Record, ResultSummary } from 'neo4j-driver-core' +import RxResult from '../../types/result-rx' +import { Observable, of, Observer, throwError } from 'rxjs' +import { concat, finalize, catchError } from 'rxjs/operators' + +const dummy: any = null + +const stringObserver: Observer = { + next: value => console.log(value), + complete: () => console.log('complete'), + error: error => console.log(`error: ${error}`) +} + +const keysObserver: Observer = { + next: value => console.log(`keys: ${value}`), + complete: () => console.log('keys complete'), + error: error => console.log(`keys error: ${error}`) +} + +const recordsObserver: Observer = { + next: value => console.log(`record: ${value}`), + complete: () => console.log('records complete'), + error: error => console.log(`records error: ${error}`) +} + +const summaryObserver: Observer = { + next: value => console.log(`summary: ${value}`), + complete: () => console.log('summary complete'), + error: error => console.log(`summary error: ${error}`) +} + +const tx: RxManagedTransaction = dummy + +const result1: RxResult = tx.run('RETURN 1') +result1.keys().subscribe(keysObserver) +result1.records().subscribe(recordsObserver) +result1.consume().subscribe(summaryObserver) + +const result2: RxResult = tx.run('RETURN $value', { value: '42' }) +result2.keys().subscribe(keysObserver) +result2.records().subscribe(recordsObserver) +result2.consume().subscribe(summaryObserver) + +const isOpen: boolean = tx.isOpen() diff --git a/packages/neo4j-driver/test/types/transaction-rx.test.ts b/packages/neo4j-driver/test/types/transaction-rx.test.ts index b10139ceb..dbd2a4a3b 100644 --- a/packages/neo4j-driver/test/types/transaction-rx.test.ts +++ b/packages/neo4j-driver/test/types/transaction-rx.test.ts @@ -61,6 +61,8 @@ result2.keys().subscribe(keysObserver) result2.records().subscribe(recordsObserver) result2.consume().subscribe(summaryObserver) +const isOpen: boolean = tx.isOpen() + tx.commit() .pipe(concat(of('committed'))) .subscribe(stringObserver) diff --git a/packages/neo4j-driver/types/index.d.ts b/packages/neo4j-driver/types/index.d.ts index 23f236748..0835c3c2c 100644 --- a/packages/neo4j-driver/types/index.d.ts +++ b/packages/neo4j-driver/types/index.d.ts @@ -74,6 +74,7 @@ import { } from './driver' import RxSession from './session-rx' import RxTransaction from './transaction-rx' +import RxManagedTransaction from './transaction-managed-rx' import RxResult from './result-rx' import { Parameters } from './query-runner' @@ -118,6 +119,7 @@ declare const types: { Integer: typeof Integer RxSession: RxSession RxTransaction: RxTransaction + RxManagedTransaction: RxManagedTransaction, RxResult: RxResult } @@ -203,6 +205,7 @@ declare const forExport: { DateTime: DateTime RxSession: RxSession RxTransaction: RxTransaction + RxManagedTransaction: RxManagedTransaction, RxResult: RxResult ConnectionProvider: ConnectionProvider isDuration: typeof isDuration @@ -262,6 +265,7 @@ export { DateTime, RxSession, RxTransaction, + RxManagedTransaction, RxResult, ConnectionProvider, isDuration, diff --git a/packages/neo4j-driver/types/session-rx.d.ts b/packages/neo4j-driver/types/session-rx.d.ts index 396e137f2..f0137b9ea 100644 --- a/packages/neo4j-driver/types/session-rx.d.ts +++ b/packages/neo4j-driver/types/session-rx.d.ts @@ -35,17 +35,32 @@ declare interface RxSession { lastBookmarks(): string[] lastBookmark(): string[] - + /** + * @deprecated This method will be removed in version 6.0. Please, use {@link RxSession#executeRead} instead. + */ readTransaction( work: RxTransactionWork, config?: TransactionConfig ): Observable + /** + * @deprecated This method will be removed in version 6.0. Please, use {@link RxSession#executeWrite} instead. + */ writeTransaction( work: RxTransactionWork, config?: TransactionConfig ): Observable + executeRead( + work: RxTransactionWork, + config?: TransactionConfig + ): Observable + + executeWrite( + work: RxTransactionWork, + config?: TransactionConfig + ): Observable + close(): Observable } diff --git a/packages/neo4j-driver/types/transaction-managed-rx.d.ts b/packages/neo4j-driver/types/transaction-managed-rx.d.ts new file mode 100644 index 000000000..131d46178 --- /dev/null +++ b/packages/neo4j-driver/types/transaction-managed-rx.d.ts @@ -0,0 +1,28 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + import { Parameters } from './query-runner' + import RxResult from './result-rx' + + declare interface RxManagedTransaction { + run(query: string, parameters?: Parameters): RxResult + isOpen(): boolean + } + + export default RxManagedTransaction + \ No newline at end of file diff --git a/packages/neo4j-driver/types/transaction-rx.d.ts b/packages/neo4j-driver/types/transaction-rx.d.ts index 64d6494a7..1b0ff4be1 100644 --- a/packages/neo4j-driver/types/transaction-rx.d.ts +++ b/packages/neo4j-driver/types/transaction-rx.d.ts @@ -22,6 +22,8 @@ import RxResult from './result-rx' declare interface RxTransaction { run(query: string, parameters?: Parameters): RxResult + + isOpen(): boolean commit(): Observable diff --git a/packages/testkit-backend/src/request-handlers-rx.js b/packages/testkit-backend/src/request-handlers-rx.js index 0582d8df2..27419a64c 100644 --- a/packages/testkit-backend/src/request-handlers-rx.js +++ b/packages/testkit-backend/src/request-handlers-rx.js @@ -173,7 +173,7 @@ export function SessionReadTransaction(context, data, wire) { const session = context.getSession(sessionId) try { - return session.readTransaction(tx => { + return session.executeRead(tx => { return from(new Promise((resolve, reject) => { const id = context.addTx(tx, sessionId, resolve, reject) wire.writeResponse(responses.RetryableTry({ id })) @@ -193,7 +193,7 @@ export function SessionWriteTransaction(context, data, wire) { const session = context.getSession(sessionId) try { - return session.writeTransaction(tx => { + return session.executeWrite(tx => { return from(new Promise((resolve, reject) => { const id = context.addTx(tx, sessionId, resolve, reject) wire.writeResponse(responses.RetryableTry({ id })) From 01718ec017429408d326c2e048e6612b4bcf6d55 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Mon, 4 Apr 2022 19:19:48 +0200 Subject: [PATCH 4/8] Fix imports --- packages/neo4j-driver/src/session-rx.js | 1 + packages/neo4j-driver/src/transaction-managed-rx.js | 2 ++ 2 files changed, 3 insertions(+) diff --git a/packages/neo4j-driver/src/session-rx.js b/packages/neo4j-driver/src/session-rx.js index 5660814dd..5d73dda4f 100644 --- a/packages/neo4j-driver/src/session-rx.js +++ b/packages/neo4j-driver/src/session-rx.js @@ -21,6 +21,7 @@ import { flatMap, catchError, concat } from 'rxjs/operators' import RxResult from './result-rx' import { Session, internal } from 'neo4j-driver-core' import RxTransaction from './transaction-rx' +import RxManagedTransaction from './transaction-managed-rx' import RxRetryLogic from './internal/retry-logic-rx' const { diff --git a/packages/neo4j-driver/src/transaction-managed-rx.js b/packages/neo4j-driver/src/transaction-managed-rx.js index bef42078f..48e4646ce 100644 --- a/packages/neo4j-driver/src/transaction-managed-rx.js +++ b/packages/neo4j-driver/src/transaction-managed-rx.js @@ -48,3 +48,5 @@ class RxManagedTransaction { return this._isOpen() } } + +export default RxManagedTransaction From e9ed0c2854d7c27318bcc53c7ad4d09196ac8a33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Barc=C3=A9los?= Date: Tue, 5 Apr 2022 11:29:07 +0200 Subject: [PATCH 5/8] Apply suggestions from code review Co-authored-by: Florent Biville <445792+fbiville@users.noreply.github.com> --- packages/core/test/session.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index 0a27d8af4..0963b27a7 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -300,7 +300,7 @@ describe('session', () => { expect(status.functionCalled).toEqual(true) }) - it('should proxy run to the begined transaction', async () => { + it('should proxy run to the begun transaction', async () => { const connection = mockBeginWithSuccess(newFakeConnection()) const session = newSessionWithConnection(connection, false, FETCH_ALL) // @ts-ignore @@ -318,7 +318,7 @@ describe('session', () => { expect(run).toHaveBeenCalledWith(query, params) }) - it('should proxy isOpen to the begined transaction', async () => { + it('should proxy isOpen to the begun transaction', async () => { const connection = mockBeginWithSuccess(newFakeConnection()) const session = newSessionWithConnection(connection, false, FETCH_ALL) const isOpen = jest.spyOn(Transaction.prototype, 'isOpen').mockImplementationOnce(() => true) From 890e1eb1431a5698adcb7b64f840aa7708757c35 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 5 Apr 2022 11:35:56 +0200 Subject: [PATCH 6/8] Remove `isOpen` method from managed transactions --- packages/core/src/session.ts | 1 - packages/core/src/transaction-managed.ts | 19 +------------------ packages/core/test/session.test.ts | 16 ---------------- packages/neo4j-driver/src/session-rx.js | 3 +-- .../src/transaction-managed-rx.js | 11 +---------- .../test/types/transaction-managed-rx.test.ts | 5 +---- .../types/transaction-managed-rx.d.ts | 1 - 7 files changed, 4 insertions(+), 52 deletions(-) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index c09e87b5a..44470ebc7 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -449,7 +449,6 @@ class Session { () => this._beginTransaction(accessMode, transactionConfig), transactionWork, tx => new ManagedTransaction({ - isOpen: tx.isOpen.bind(tx), run: tx.run.bind(tx), }) ) diff --git a/packages/core/src/transaction-managed.ts b/packages/core/src/transaction-managed.ts index edc2c2be6..175ac358d 100644 --- a/packages/core/src/transaction-managed.ts +++ b/packages/core/src/transaction-managed.ts @@ -24,10 +24,6 @@ interface Run { (query: Query, parameters?: any): Result } -interface IsOpen { - (): boolean -} - /** * Represents a transaction that is managed by the transaction executor. * @@ -35,17 +31,12 @@ interface IsOpen { */ class ManagedTransaction { private _run: Run - private _isOpen: IsOpen - constructor({ run, isOpen }: { run: Run, isOpen: IsOpen }) { + constructor({ run }: { run: Run }) { /** * @private */ this._run = run - /** - * @private - */ - this._isOpen = isOpen } /** @@ -59,14 +50,6 @@ class ManagedTransaction { run(query: Query, parameters?: any): Result { return this._run(query, parameters) } - - /** - * Check if this transaction is active, which means commit and rollback did not happen. - * @return {boolean} `true` when not committed and not rolled back, `false` otherwise. - */ - isOpen(): boolean { - return this._isOpen() - } } export default ManagedTransaction diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index 0963b27a7..b14ad6818 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -317,22 +317,6 @@ describe('session', () => { expect(status.functionCalled).toEqual(true) expect(run).toHaveBeenCalledWith(query, params) }) - - it('should proxy isOpen to the begun transaction', async () => { - const connection = mockBeginWithSuccess(newFakeConnection()) - const session = newSessionWithConnection(connection, false, FETCH_ALL) - const isOpen = jest.spyOn(Transaction.prototype, 'isOpen').mockImplementationOnce(() => true) - const status = { functionCalled: false } - - await execute(session)(async (tx: ManagedTransaction) => { - status.functionCalled = true - expect(tx.isOpen()).toBe(true) - }) - - expect(status.functionCalled).toEqual(true) - expect(isOpen).toHaveBeenCalled() - }) - }) }) function mockBeginWithSuccess(connection: FakeConnection) { diff --git a/packages/neo4j-driver/src/session-rx.js b/packages/neo4j-driver/src/session-rx.js index 5d73dda4f..aed6315ec 100644 --- a/packages/neo4j-driver/src/session-rx.js +++ b/packages/neo4j-driver/src/session-rx.js @@ -140,8 +140,7 @@ export default class RxSession { */ _executeInTransaction (accessMode, work, transactionConfig) { const wrapper = txc => new RxManagedTransaction({ - run: txc.run.bind(txc), - isOpen: txc.isOpen.bind(txc) + run: txc.run.bind(txc) }) return this._runTransaction(accessMode, work, transactionConfig, wrapper) } diff --git a/packages/neo4j-driver/src/transaction-managed-rx.js b/packages/neo4j-driver/src/transaction-managed-rx.js index 48e4646ce..937838b4a 100644 --- a/packages/neo4j-driver/src/transaction-managed-rx.js +++ b/packages/neo4j-driver/src/transaction-managed-rx.js @@ -23,9 +23,8 @@ * @public */ class RxManagedTransaction { - constructor({ run, isOpen }) { + constructor({ run }) { this._run = run - this._isOpen = isOpen } /** @@ -39,14 +38,6 @@ class RxManagedTransaction { run (query, parameters) { return this._run(query, parameters) } - - /** - * Check if this transaction is active, which means commit and rollback did not happen. - * @return {boolean} `true` when not committed and not rolled back, `false` otherwise. - */ - isOpen() { - return this._isOpen() - } } export default RxManagedTransaction diff --git a/packages/neo4j-driver/test/types/transaction-managed-rx.test.ts b/packages/neo4j-driver/test/types/transaction-managed-rx.test.ts index 0065842e9..0aecebea3 100644 --- a/packages/neo4j-driver/test/types/transaction-managed-rx.test.ts +++ b/packages/neo4j-driver/test/types/transaction-managed-rx.test.ts @@ -20,8 +20,7 @@ import RxManagedTransaction from '../../types/transaction-managed-rx' import { Record, ResultSummary } from 'neo4j-driver-core' import RxResult from '../../types/result-rx' -import { Observable, of, Observer, throwError } from 'rxjs' -import { concat, finalize, catchError } from 'rxjs/operators' +import { Observer } from 'rxjs' const dummy: any = null @@ -60,5 +59,3 @@ const result2: RxResult = tx.run('RETURN $value', { value: '42' }) result2.keys().subscribe(keysObserver) result2.records().subscribe(recordsObserver) result2.consume().subscribe(summaryObserver) - -const isOpen: boolean = tx.isOpen() diff --git a/packages/neo4j-driver/types/transaction-managed-rx.d.ts b/packages/neo4j-driver/types/transaction-managed-rx.d.ts index 131d46178..4e2a37f21 100644 --- a/packages/neo4j-driver/types/transaction-managed-rx.d.ts +++ b/packages/neo4j-driver/types/transaction-managed-rx.d.ts @@ -21,7 +21,6 @@ declare interface RxManagedTransaction { run(query: string, parameters?: Parameters): RxResult - isOpen(): boolean } export default RxManagedTransaction From b9ad224c4fd1f9cbd0b3a0b0160008161a5aa8bf Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 5 Apr 2022 11:51:11 +0200 Subject: [PATCH 7/8] ops --- packages/core/src/session.ts | 4 +--- packages/core/src/transaction-managed.ts | 17 ++++++++++++++++- packages/core/test/session.test.ts | 1 + .../neo4j-driver/src/transaction-managed-rx.js | 16 ++++++++++++++++ 4 files changed, 34 insertions(+), 4 deletions(-) diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 44470ebc7..4996d4f8c 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -448,9 +448,7 @@ class Session { return this._transactionExecutor.execute( () => this._beginTransaction(accessMode, transactionConfig), transactionWork, - tx => new ManagedTransaction({ - run: tx.run.bind(tx), - }) + ManagedTransaction.fromTransaction ) } diff --git a/packages/core/src/transaction-managed.ts b/packages/core/src/transaction-managed.ts index 175ac358d..e9a44b440 100644 --- a/packages/core/src/transaction-managed.ts +++ b/packages/core/src/transaction-managed.ts @@ -18,6 +18,7 @@ */ import Result from './result' +import Transaction from './transaction' import { Query } from './types' interface Run { @@ -32,13 +33,27 @@ interface Run { class ManagedTransaction { private _run: Run - constructor({ run }: { run: Run }) { + /** + * @private + */ + private constructor({ run }: { run: Run }) { /** * @private */ this._run = run } + /** + * @private + * @param {Transaction} tx - Transaction to wrap + * @returns {ManagedTransaction} the ManagedTransaction + */ + static fromTransaction(tx: Transaction): ManagedTransaction { + return new ManagedTransaction({ + run: tx.run.bind(tx) + }) + } + /** * Run Cypher query * Could be called with a query object i.e.: `{text: "MATCH ...", parameters: {param: 1}}` diff --git a/packages/core/test/session.test.ts b/packages/core/test/session.test.ts index b14ad6818..a975b13e7 100644 --- a/packages/core/test/session.test.ts +++ b/packages/core/test/session.test.ts @@ -317,6 +317,7 @@ describe('session', () => { expect(status.functionCalled).toEqual(true) expect(run).toHaveBeenCalledWith(query, params) }) + }) }) function mockBeginWithSuccess(connection: FakeConnection) { diff --git a/packages/neo4j-driver/src/transaction-managed-rx.js b/packages/neo4j-driver/src/transaction-managed-rx.js index 937838b4a..5e9a7a24a 100644 --- a/packages/neo4j-driver/src/transaction-managed-rx.js +++ b/packages/neo4j-driver/src/transaction-managed-rx.js @@ -16,6 +16,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +import RxResult from './result-rx' +import RxTransaction from './transaction-rx' /** * Represents a rx transaction that is managed by the transaction executor. @@ -23,10 +25,24 @@ * @public */ class RxManagedTransaction { + /** + * @private + */ constructor({ run }) { this._run = run } + /** + * @private + * @param {RxTransaction} txc - The transaction to be wrapped + * @returns {RxManagedTransaction} The managed transaction + */ + static fromTransaction (txc) { + return new RxManagedTransaction({ + run: txc.run.bind(txc) + }) + } + /** * Creates a reactive result that will execute the query in this transaction, with the provided parameters. * From e5b76186bc922dbb9aa7f9010645c3659ad45511 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Tue, 5 Apr 2022 18:01:16 +0200 Subject: [PATCH 8/8] Remove un-used import --- .../neo4j-driver/test/internal/transaction-executor.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/neo4j-driver/test/internal/transaction-executor.test.js b/packages/neo4j-driver/test/internal/transaction-executor.test.js index 42fbd8005..75e7136fe 100644 --- a/packages/neo4j-driver/test/internal/transaction-executor.test.js +++ b/packages/neo4j-driver/test/internal/transaction-executor.test.js @@ -17,7 +17,7 @@ * limitations under the License. */ -import { newError, error as err, internal, int } from 'neo4j-driver-core' +import { newError, error as err, internal } from 'neo4j-driver-core' import { setTimeoutMock } from './timers-util' import lolex from 'lolex'