Skip to content

Commit a0164c6

Browse files
committed
Introduce the hability of awaiting for eager begining a tx
1 parent b99d313 commit a0164c6

File tree

5 files changed

+203
-11
lines changed

5 files changed

+203
-11
lines changed

packages/core/src/session.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import ConnectionProvider from './connection-provider'
3131
import { Query, SessionMode } from './types'
3232
import Connection from './connection'
3333
import { NumberOrInteger } from './graph-types'
34+
import TransactionPromise from './transaction-promise'
3435

3536
type ConnectionConsumer = (connection: Connection | void) => any | undefined
3637
type TransactionWork<T> = (tx: Transaction) => Promise<T> | T
@@ -241,7 +242,7 @@ class Session {
241242
* @param {TransactionConfig} [transactionConfig] - Configuration for the new auto-commit transaction.
242243
* @returns {Transaction} New Transaction.
243244
*/
244-
beginTransaction(transactionConfig?: TransactionConfig): Transaction {
245+
beginTransaction(transactionConfig?: TransactionConfig): TransactionPromise {
245246
// this function needs to support bookmarks parameter for backwards compatibility
246247
// parameter was of type {string|string[]} and represented either a single or multiple bookmarks
247248
// that's why we need to check parameter type and decide how to interpret the value
@@ -255,7 +256,7 @@ class Session {
255256
return this._beginTransaction(this._mode, txConfig)
256257
}
257258

258-
_beginTransaction(accessMode: SessionMode, txConfig: TxConfig): Transaction {
259+
_beginTransaction(accessMode: SessionMode, txConfig: TxConfig): TransactionPromise {
259260
if (!this._open) {
260261
throw newError('Cannot begin a transaction on a closed session.')
261262
}
@@ -271,7 +272,7 @@ class Session {
271272
connectionHolder.initializeConnection()
272273
this._hasTx = true
273274

274-
const tx = new Transaction({
275+
const tx = new TransactionPromise({
275276
connectionHolder,
276277
impersonatedUser: this._impersonatedUser,
277278
onClose: this._transactionClosed.bind(this),
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import Transaction from "./transaction"
21+
import {
22+
ConnectionHolder
23+
} from './internal/connection-holder'
24+
25+
import { Bookmarks } from './internal/bookmarks'
26+
import { TxConfig } from "./internal/tx-config";
27+
import Connection from "./connection";
28+
import Result from "./result";
29+
import { Query } from "./types";
30+
31+
class TransactionPromise extends Transaction implements Promise<Transaction>{
32+
private _beginError?: Error;
33+
private _beginMetadata?: any;
34+
private _beginPromise?: Promise<Transaction>;
35+
private _reject?: (error: Error) => void;
36+
private _resolve?: (value?: Transaction | PromiseLike<Transaction> | undefined) => void;
37+
private _delegate: Transaction;
38+
39+
constructor({
40+
connectionHolder,
41+
onClose,
42+
onBookmarks,
43+
onConnection,
44+
reactive,
45+
fetchSize,
46+
impersonatedUser,
47+
highRecordWatermark,
48+
lowRecordWatermark
49+
}: {
50+
connectionHolder: ConnectionHolder
51+
onClose: () => void
52+
onBookmarks: (bookmarks: Bookmarks) => void
53+
onConnection: () => void
54+
reactive: boolean
55+
fetchSize: number
56+
impersonatedUser?: string,
57+
highRecordWatermark: number,
58+
lowRecordWatermark: number
59+
}) {
60+
super({
61+
connectionHolder,
62+
onClose,
63+
onBookmarks,
64+
onConnection,
65+
reactive,
66+
fetchSize,
67+
impersonatedUser,
68+
highRecordWatermark,
69+
lowRecordWatermark
70+
})
71+
this._delegate = new Transaction({
72+
connectionHolder,
73+
onClose,
74+
onBookmarks,
75+
onConnection,
76+
reactive,
77+
fetchSize,
78+
impersonatedUser,
79+
highRecordWatermark,
80+
lowRecordWatermark
81+
})
82+
}
83+
run(query: Query, parameters?: any): Result {
84+
return this._delegate.run(query, parameters);
85+
}
86+
commit(): Promise<void> {
87+
return this._delegate.commit()
88+
}
89+
rollback(): Promise<void> {
90+
return this._delegate.rollback();
91+
}
92+
isOpen(): boolean {
93+
return this._delegate.isOpen()
94+
}
95+
close(): Promise<void> {
96+
return this._delegate.close();
97+
}
98+
_onErrorCallback(err: any): Promise<void | Connection> {
99+
return this._delegate._onErrorCallback(err);
100+
}
101+
_onCompleteCallback(meta: { bookmark?: string | string[] | undefined; }): void {
102+
return this._delegate._onCompleteCallback(meta);
103+
}
104+
105+
_begin(bookmarks: string | Bookmarks | string[], txConfig: TxConfig): void {
106+
return this._delegate._begin(bookmarks, txConfig, {
107+
onError: this._onBeginError.bind(this),
108+
onComplete: this._onBeginMetadata.bind(this)
109+
});
110+
}
111+
112+
_onBeginError(error: Error): void {
113+
this._beginError = error;
114+
if (this._reject) {
115+
this._reject(error);
116+
}
117+
}
118+
119+
_onBeginMetadata(metadata: any): void {
120+
this._beginMetadata = metadata || {};
121+
if (this._resolve) {
122+
this._resolve(this._delegate);
123+
}
124+
}
125+
126+
[Symbol.toStringTag]: string = "TransactionPromise"
127+
128+
then<TResult1 = Transaction, TResult2 = never>(
129+
onfulfilled?:
130+
((value: Transaction) => TResult1 | PromiseLike<TResult1> )
131+
| null,
132+
onrejected?:
133+
((reason: any) => TResult2 | PromiseLike<TResult2>)
134+
| null
135+
) : Promise<TResult1 | TResult2> {
136+
return this._getOrCreateBeginPromise().then(onfulfilled, onrejected);
137+
}
138+
catch<TResult = never>(onrejected?: ((reason: any) => TResult | PromiseLike<TResult>) | null): Promise<any> {
139+
return this._getOrCreateBeginPromise().catch(onrejected);
140+
}
141+
finally(onfinally?: (() => void) | null): Promise<Transaction> {
142+
return this._getOrCreateBeginPromise().finally(onfinally);
143+
}
144+
145+
private _getOrCreateBeginPromise(): Promise<Transaction> {
146+
if (!this._beginPromise) {
147+
this._beginPromise = new Promise((resolve, reject) => {
148+
this._resolve = resolve;
149+
this._reject = reject;
150+
if (this._beginError) {
151+
reject(this._beginError);
152+
}
153+
if (this._beginMetadata) {
154+
resolve(this._delegate);
155+
}
156+
});
157+
}
158+
return this._beginPromise;
159+
}
160+
}
161+
162+
export default TransactionPromise

packages/core/src/transaction.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ class Transaction {
109109
* @param {TxConfig} txConfig
110110
* @returns {void}
111111
*/
112-
_begin(bookmarks: Bookmarks | string | string[], txConfig: TxConfig): void {
112+
_begin(bookmarks: Bookmarks | string | string[], txConfig: TxConfig, events?: {
113+
onError: (error: Error) => void
114+
onComplete: (metadata: any) => void
115+
}): void {
113116
this._connectionHolder
114117
.getConnection()
115118
.then(connection => {
@@ -121,14 +124,29 @@ class Transaction {
121124
mode: this._connectionHolder.mode(),
122125
database: this._connectionHolder.database(),
123126
impersonatedUser: this._impersonatedUser,
124-
beforeError: this._onError,
125-
afterComplete: this._onComplete
127+
beforeError: (error: Error) => {
128+
if (events) {
129+
events.onError(error)
130+
}
131+
return this._onError(error).catch(() => {})
132+
},
133+
afterComplete: (metadata: any) => {
134+
if (events) {
135+
events.onComplete(metadata)
136+
}
137+
return this._onComplete(metadata)
138+
}
126139
})
127140
} else {
128141
throw newError('No connection available')
129142
}
130143
})
131-
.catch(error => this._onError(error))
144+
.catch(error => {
145+
if (events) {
146+
events.onError(error)
147+
}
148+
this._onError(error).catch(() => {})
149+
})
132150
}
133151

134152
/**

packages/testkit-backend/src/request-handlers.js

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,16 +285,21 @@ export function RetryableNegative (context, data, wire) {
285285
export function SessionBeginTransaction (context, data, wire) {
286286
const { sessionId, txMeta: metadata, timeout } = data
287287
const session = context.getSession(sessionId)
288-
let tx
288+
289289
try {
290-
tx = session.beginTransaction({ metadata, timeout })
290+
return session.beginTransaction({ metadata, timeout })
291+
.then(tx => {
292+
const id = context.addTx(tx, sessionId)
293+
wire.writeResponse('Transaction', { id })
294+
}).catch(e => {
295+
console.log('got some err: ' + JSON.stringify(e))
296+
wire.writeError(e)
297+
})
291298
} catch (e) {
292299
console.log('got some err: ' + JSON.stringify(e))
293300
wire.writeError(e)
294301
return
295302
}
296-
const id = context.addTx(tx, sessionId)
297-
wire.writeResponse('Transaction', { id })
298303
}
299304

300305
export function TransactionCommit (context, data, wire) {
@@ -372,6 +377,7 @@ export function GetFeatures (_context, _params, wire) {
372377
'Feature:Bolt:4.4',
373378
'Feature:API:Result.List',
374379
'Feature:API:Result.Peek',
380+
'Optimization:EagerTransactionBegin',
375381
'Temporary:ConnectionAcquisitionTimeout',
376382
'Temporary:CypherPathAndRelationship',
377383
'Temporary:DriverFetchSize',

packages/testkit-backend/src/skipped-tests/common.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import skip, { ifEquals, ifEndsWith, ifStartsWith } from './skip'
22

33
const skippedTests = [
4+
skip(
5+
'Eager verification not implemented for tx functions',
6+
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_eager_begin_on_tx_func_run_with_error_on_begin'),
7+
ifEquals('stub.tx_run.test_tx_run.TestTxRun.test_eager_begin_on_tx_func_run_with_disconnect_on_begin')
8+
),
49
skip(
510
'Fail while enable Temporary::ResultKeys',
611
ifEquals('neo4j.test_bookmarks.TestBookmarks.test_can_pass_bookmark_into_next_session'),

0 commit comments

Comments
 (0)