Skip to content

Commit 00d821d

Browse files
authored
Introduce the ability of awaiting for eager beginning a Transaction (#870)
The current implementation of `session.beginTransaction` does not wait the success message. This kind of behaviour can cause future calls to `run` and `commit` because of non-reported errors during the begin. For solving this issue, the transaction object return by the `session.beginTransaction` was changed to `TransactionPromise`. This new type holds all the behaviours from a Transaction and also holds the promise for the begin transaction. This way, the caller can eagerly eager check the transaction begin and at the same type keep the API existing in the current codebase. **Eager Mode:** ```javascript const tx = await session.beginTransaction() // waits for begin result const result = await tx.run('RETURN 1') // only throws RUN related errors, not reachable when begin fails ``` **Lazy Mode:** ```javascript const tx = session.beginTransaction() // no wait, any begin errors will be thrown const result = await tx.run('RETURN 1') // throws BEGIN and RUN related errors ```
1 parent a4623e3 commit 00d821d

File tree

9 files changed

+689
-79
lines changed

9 files changed

+689
-79
lines changed

packages/core/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ import Result, { QueryResult, ResultObserver } from './result'
6969
import ConnectionProvider from './connection-provider'
7070
import Connection from './connection'
7171
import Transaction from './transaction'
72+
import TransactionPromise from './transaction-promise'
7273
import Session, { TransactionConfig } from './session'
7374
import Driver, * as driver from './driver'
7475
import auth from './auth'
@@ -134,6 +135,7 @@ const forExport = {
134135
Stats,
135136
Result,
136137
Transaction,
138+
TransactionPromise,
137139
Session,
138140
Driver,
139141
Connection,
@@ -191,6 +193,7 @@ export {
191193
ConnectionProvider,
192194
Connection,
193195
Transaction,
196+
TransactionPromise,
194197
Session,
195198
Driver,
196199
types,

packages/core/src/session.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import ConnectionProvider from './connection-provider'
3131
import { Query, SessionMode } from './types'
3232
import Connection from './connection'
3333
import { NumberOrInteger } from './graph-types'
34+
import TransactionPromise from './transaction-promise'
3435

3536
type ConnectionConsumer = (connection: Connection | void) => any | undefined
3637
type TransactionWork<T> = (tx: Transaction) => Promise<T> | T
@@ -239,9 +240,9 @@ class Session {
239240
* While a transaction is open the session cannot be used to run queries outside the transaction.
240241
*
241242
* @param {TransactionConfig} [transactionConfig] - Configuration for the new auto-commit transaction.
242-
* @returns {Transaction} New Transaction.
243+
* @returns {TransactionPromise} New Transaction.
243244
*/
244-
beginTransaction(transactionConfig?: TransactionConfig): Transaction {
245+
beginTransaction(transactionConfig?: TransactionConfig): TransactionPromise {
245246
// this function needs to support bookmarks parameter for backwards compatibility
246247
// parameter was of type {string|string[]} and represented either a single or multiple bookmarks
247248
// that's why we need to check parameter type and decide how to interpret the value
@@ -255,7 +256,7 @@ class Session {
255256
return this._beginTransaction(this._mode, txConfig)
256257
}
257258

258-
_beginTransaction(accessMode: SessionMode, txConfig: TxConfig): Transaction {
259+
_beginTransaction(accessMode: SessionMode, txConfig: TxConfig): TransactionPromise {
259260
if (!this._open) {
260261
throw newError('Cannot begin a transaction on a closed session.')
261262
}
@@ -271,7 +272,7 @@ class Session {
271272
connectionHolder.initializeConnection()
272273
this._hasTx = true
273274

274-
const tx = new Transaction({
275+
const tx = new TransactionPromise({
275276
connectionHolder,
276277
impersonatedUser: this._impersonatedUser,
277278
onClose: this._transactionClosed.bind(this),
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import Transaction from "./transaction"
21+
import {
22+
ConnectionHolder
23+
} from './internal/connection-holder'
24+
25+
import { Bookmarks } from './internal/bookmarks'
26+
import { TxConfig } from "./internal/tx-config";
27+
28+
/**
29+
* Represents a {@link Promise<Transaction>} object and a {@link Transaction} object.
30+
*
31+
* Resolving this object promise verifies the result of the transaction begin and returns the {@link Transaction} object in case of success.
32+
*
33+
* The object can still also used as {@link Transaction} for convenience. The result of begin will be checked
34+
* during the next API calls in the object as it is in the transaction.
35+
*
36+
* @access public
37+
*/
38+
class TransactionPromise extends Transaction implements Promise<Transaction>{
39+
[Symbol.toStringTag]: string = "TransactionPromise"
40+
private _beginError?: Error;
41+
private _beginMetadata?: any;
42+
private _beginPromise?: Promise<Transaction>;
43+
private _reject?: (error: Error) => void;
44+
private _resolve?: (value?: Transaction | PromiseLike<Transaction> | undefined) => void;
45+
46+
/**
47+
* @constructor
48+
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
49+
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
50+
* @param {function(bookmarks: Bookmarks)} onBookmarks callback invoked when new bookmark is produced.
51+
* @param {function()} onConnection - Function to be called when a connection is obtained to ensure the connection
52+
* is not yet released.
53+
* @param {boolean} reactive whether this transaction generates reactive streams
54+
* @param {number} fetchSize - the record fetch size in each pulling batch.
55+
* @param {string} impersonatedUser - The name of the user which should be impersonated for the duration of the session.
56+
*/
57+
constructor({
58+
connectionHolder,
59+
onClose,
60+
onBookmarks,
61+
onConnection,
62+
reactive,
63+
fetchSize,
64+
impersonatedUser,
65+
highRecordWatermark,
66+
lowRecordWatermark
67+
}: {
68+
connectionHolder: ConnectionHolder
69+
onClose: () => void
70+
onBookmarks: (bookmarks: Bookmarks) => void
71+
onConnection: () => void
72+
reactive: boolean
73+
fetchSize: number
74+
impersonatedUser?: string,
75+
highRecordWatermark: number,
76+
lowRecordWatermark: number
77+
}) {
78+
super({
79+
connectionHolder,
80+
onClose,
81+
onBookmarks,
82+
onConnection,
83+
reactive,
84+
fetchSize,
85+
impersonatedUser,
86+
highRecordWatermark,
87+
lowRecordWatermark
88+
})
89+
}
90+
91+
/**
92+
* Waits for the begin to complete.
93+
*
94+
* @param {function(transaction: Transaction)} onFulfilled - function to be called when finished.
95+
* @param {function(error: {message:string, code:string})} onRejected - function to be called upon errors.
96+
* @return {Promise} promise.
97+
*/
98+
then<TResult1 = Transaction, TResult2 = never>(
99+
onfulfilled?:
100+
((value: Transaction) => TResult1 | PromiseLike<TResult1>)
101+
| null,
102+
onrejected?:
103+
((reason: any) => TResult2 | PromiseLike<TResult2>)
104+
| null
105+
): Promise<TResult1 | TResult2> {
106+
return this._getOrCreateBeginPromise().then(onfulfilled, onrejected);
107+
}
108+
109+
/**
110+
* Catch errors when using promises.
111+
*
112+
* @param {function(error: Neo4jError)} onRejected - Function to be called upon errors.
113+
* @return {Promise} promise.
114+
*/
115+
catch<TResult = never>(onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | null): Promise<any> {
116+
return this._getOrCreateBeginPromise().catch(onrejected);
117+
}
118+
119+
/**
120+
* Called when finally the begin is done
121+
*
122+
* @param {function()|null} onfinally - function when the promise finished
123+
* @return {Promise} promise.
124+
*/
125+
finally(onfinally?: (() => void) | null): Promise<Transaction> {
126+
return this._getOrCreateBeginPromise().finally(onfinally);
127+
}
128+
129+
private _getOrCreateBeginPromise(): Promise<Transaction> {
130+
if (!this._beginPromise) {
131+
this._beginPromise = new Promise((resolve, reject) => {
132+
this._resolve = resolve;
133+
this._reject = reject;
134+
if (this._beginError) {
135+
reject(this._beginError);
136+
}
137+
if (this._beginMetadata) {
138+
resolve(this._toTransaction());
139+
}
140+
});
141+
}
142+
return this._beginPromise;
143+
}
144+
145+
/**
146+
* @access private
147+
*/
148+
private _toTransaction(): Transaction {
149+
//@ts-ignore
150+
return {
151+
...this,
152+
run: super.run.bind(this),
153+
commit: super.commit.bind(this),
154+
rollback: super.rollback.bind(this),
155+
close: super.close.bind(this),
156+
isOpen: super.isOpen.bind(this),
157+
_begin: this._begin.bind(this),
158+
}
159+
}
160+
161+
/**
162+
* @access private
163+
*/
164+
_begin(bookmarks: string | Bookmarks | string[], txConfig: TxConfig): void {
165+
return super._begin(bookmarks, txConfig, {
166+
onError: this._onBeginError.bind(this),
167+
onComplete: this._onBeginMetadata.bind(this)
168+
});
169+
}
170+
171+
/**
172+
* @access private
173+
*/
174+
private _onBeginError(error: Error): void {
175+
this._beginError = error;
176+
if (this._reject) {
177+
this._reject(error);
178+
}
179+
}
180+
181+
/**
182+
* @access private
183+
*/
184+
private _onBeginMetadata(metadata: any): void {
185+
this._beginMetadata = metadata || {};
186+
if (this._resolve) {
187+
this._resolve(this._toTransaction());
188+
}
189+
}
190+
191+
}
192+
193+
export default TransactionPromise

packages/core/src/transaction.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,13 @@ class Transaction {
6161
* @param {ConnectionHolder} connectionHolder - the connection holder to get connection from.
6262
* @param {function()} onClose - Function to be called when transaction is committed or rolled back.
6363
* @param {function(bookmarks: Bookmarks)} onBookmarks callback invoked when new bookmark is produced.
64-
* * @param {function()} onConnection - Function to be called when a connection is obtained to ensure the conneciton
64+
* @param {function()} onConnection - Function to be called when a connection is obtained to ensure the conneciton
6565
* is not yet released.
6666
* @param {boolean} reactive whether this transaction generates reactive streams
6767
* @param {number} fetchSize - the record fetch size in each pulling batch.
6868
* @param {string} impersonatedUser - The name of the user which should be impersonated for the duration of the session.
69+
* @param {number} highRecordWatermark - The high watermark for the record buffer.
70+
* @param {number} lowRecordWatermark - The low watermark for the record buffer.
6971
*/
7072
constructor({
7173
connectionHolder,
@@ -109,7 +111,10 @@ class Transaction {
109111
* @param {TxConfig} txConfig
110112
* @returns {void}
111113
*/
112-
_begin(bookmarks: Bookmarks | string | string[], txConfig: TxConfig): void {
114+
_begin(bookmarks: Bookmarks | string | string[], txConfig: TxConfig, events?: {
115+
onError: (error: Error) => void
116+
onComplete: (metadata: any) => void
117+
}): void {
113118
this._connectionHolder
114119
.getConnection()
115120
.then(connection => {
@@ -121,14 +126,29 @@ class Transaction {
121126
mode: this._connectionHolder.mode(),
122127
database: this._connectionHolder.database(),
123128
impersonatedUser: this._impersonatedUser,
124-
beforeError: this._onError,
125-
afterComplete: this._onComplete
129+
beforeError: (error: Error) => {
130+
if (events) {
131+
events.onError(error)
132+
}
133+
return this._onError(error).catch(() => {})
134+
},
135+
afterComplete: (metadata: any) => {
136+
if (events) {
137+
events.onComplete(metadata)
138+
}
139+
return this._onComplete(metadata)
140+
}
126141
})
127142
} else {
128143
throw newError('No connection available')
129144
}
130145
})
131-
.catch(error => this._onError(error))
146+
.catch(error => {
147+
if (events) {
148+
events.onError(error)
149+
}
150+
this._onError(error).catch(() => {})
151+
})
132152
}
133153

134154
/**

packages/core/test/session.test.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
import { ConnectionProvider, Session, Connection } from '../src'
19+
import { ConnectionProvider, Session, Connection, TransactionPromise, Transaction } from '../src'
2020
import { bookmarks } from '../src/internal'
2121
import { ACCESS_MODE_READ, FETCH_ALL } from '../src/internal/constants'
2222
import FakeConnection from './utils/connection.fake'
@@ -227,6 +227,35 @@ describe('session', () => {
227227
expect(session.lastBookmarks()).toEqual(bookmarks.values())
228228
})
229229
})
230+
231+
describe('.beginTransaction()', () => {
232+
it('should return a TransactionPromise', () => {
233+
const session = newSessionWithConnection(newFakeConnection(), false, 1000)
234+
235+
const tx: Transaction = session.beginTransaction()
236+
237+
expect(tx).toBeInstanceOf(TransactionPromise)
238+
})
239+
240+
it('should resolves a Transaction', async () => {
241+
const connection = newFakeConnection()
242+
const protocol = connection.protocol()
243+
// @ts-ignore
244+
connection.protocol = () => {
245+
return {
246+
...protocol,
247+
beginTransaction: (params: { afterComplete: () => {} }) => {
248+
params.afterComplete()
249+
}
250+
}
251+
}
252+
const session = newSessionWithConnection(connection, false, 1000)
253+
254+
const tx: Transaction = await session.beginTransaction()
255+
256+
expect(tx).toBeDefined()
257+
})
258+
})
230259
})
231260

232261
function newSessionWithConnection(

0 commit comments

Comments
 (0)