@@ -42,15 +42,25 @@ class Transaction {
42
42
* @param {ConnectionHolder } connectionHolder - the connection holder to get connection from.
43
43
* @param {function() } onClose - Function to be called when transaction is committed or rolled back.
44
44
* @param {function(bookmark: Bookmark) } onBookmark callback invoked when new bookmark is produced.
45
+ * * @param {function() } onConnection - Function to be called when a connection is obtained to ensure the conneciton
46
+ * is not yet released.
45
47
* @param {boolean } reactive whether this transaction generates reactive streams
46
48
* @param {number } fetchSize - the record fetch size in each pulling batch.
47
49
*/
48
- constructor ( { connectionHolder, onClose, onBookmark, reactive, fetchSize } ) {
50
+ constructor ( {
51
+ connectionHolder,
52
+ onClose,
53
+ onBookmark,
54
+ onConnection,
55
+ reactive,
56
+ fetchSize
57
+ } ) {
49
58
this . _connectionHolder = connectionHolder
50
59
this . _reactive = reactive
51
60
this . _state = _states . ACTIVE
52
61
this . _onClose = onClose
53
62
this . _onBookmark = onBookmark
63
+ this . _onConnection = onConnection
54
64
this . _onError = this . _onErrorCallback . bind ( this )
55
65
this . _onComplete = this . _onCompleteCallback . bind ( this )
56
66
this . _fetchSize = fetchSize
@@ -60,16 +70,17 @@ class Transaction {
60
70
_begin ( bookmark , txConfig ) {
61
71
this . _connectionHolder
62
72
. getConnection ( )
63
- . then ( conn =>
64
- conn . protocol ( ) . beginTransaction ( {
73
+ . then ( conn => {
74
+ this . _onConnection ( )
75
+ return conn . protocol ( ) . beginTransaction ( {
65
76
bookmark : bookmark ,
66
77
txConfig : txConfig ,
67
78
mode : this . _connectionHolder . mode ( ) ,
68
79
database : this . _connectionHolder . database ( ) ,
69
80
beforeError : this . _onError ,
70
81
afterComplete : this . _onComplete
71
82
} )
72
- )
83
+ } )
73
84
. catch ( error => this . _onError ( error ) )
74
85
}
75
86
@@ -91,6 +102,7 @@ class Transaction {
91
102
connectionHolder : this . _connectionHolder ,
92
103
onError : this . _onError ,
93
104
onComplete : this . _onComplete ,
105
+ onConnection : this . _onConnection ,
94
106
reactive : this . _reactive ,
95
107
fetchSize : this . _fetchSize
96
108
} )
@@ -110,6 +122,7 @@ class Transaction {
110
122
connectionHolder : this . _connectionHolder ,
111
123
onError : this . _onError ,
112
124
onComplete : this . _onComplete ,
125
+ onConnection : this . _onConnection ,
113
126
pendingResults : this . _results
114
127
} )
115
128
this . _state = committed . state
@@ -136,6 +149,7 @@ class Transaction {
136
149
connectionHolder : this . _connectionHolder ,
137
150
onError : this . _onError ,
138
151
onComplete : this . _onComplete ,
152
+ onConnection : this . _onConnection ,
139
153
pendingResults : this . _results
140
154
} )
141
155
this . _state = rolledback . state
@@ -176,25 +190,39 @@ class Transaction {
176
190
const _states = {
177
191
// The transaction is running with no explicit success or failure marked
178
192
ACTIVE : {
179
- commit : ( { connectionHolder, onError, onComplete, pendingResults } ) => {
193
+ commit : ( {
194
+ connectionHolder,
195
+ onError,
196
+ onComplete,
197
+ onConnection,
198
+ pendingResults
199
+ } ) => {
180
200
return {
181
201
result : finishTransaction (
182
202
true ,
183
203
connectionHolder ,
184
204
onError ,
185
205
onComplete ,
206
+ onConnection ,
186
207
pendingResults
187
208
) ,
188
209
state : _states . SUCCEEDED
189
210
}
190
211
} ,
191
- rollback : ( { connectionHolder, onError, onComplete, pendingResults } ) => {
212
+ rollback : ( {
213
+ connectionHolder,
214
+ onError,
215
+ onComplete,
216
+ onConnection,
217
+ pendingResults
218
+ } ) => {
192
219
return {
193
220
result : finishTransaction (
194
221
false ,
195
222
connectionHolder ,
196
223
onError ,
197
224
onComplete ,
225
+ onConnection ,
198
226
pendingResults
199
227
) ,
200
228
state : _states . ROLLED_BACK
@@ -203,22 +231,30 @@ const _states = {
203
231
run : (
204
232
query ,
205
233
parameters ,
206
- { connectionHolder, onError, onComplete, reactive, fetchSize }
234
+ {
235
+ connectionHolder,
236
+ onError,
237
+ onComplete,
238
+ onConnection,
239
+ reactive,
240
+ fetchSize
241
+ }
207
242
) => {
208
243
// RUN in explicit transaction can't contain bookmarks and transaction configuration
209
244
// No need to include mode and database name as it shall be inclued in begin
210
245
const observerPromise = connectionHolder
211
246
. getConnection ( )
212
- . then ( conn =>
213
- conn . protocol ( ) . run ( query , parameters , {
247
+ . then ( conn => {
248
+ onConnection ( )
249
+ return conn . protocol ( ) . run ( query , parameters , {
214
250
bookmark : Bookmark . empty ( ) ,
215
251
txConfig : TxConfig . empty ( ) ,
216
252
beforeError : onError ,
217
253
afterComplete : onComplete ,
218
254
reactive : reactive ,
219
255
fetchSize : fetchSize
220
256
} )
221
- )
257
+ } )
222
258
. catch ( error => new FailedObserver ( { error, onError } ) )
223
259
224
260
return newCompletedResult ( observerPromise , query , parameters )
@@ -249,11 +285,7 @@ const _states = {
249
285
state : _states . FAILED
250
286
}
251
287
} ,
252
- run : (
253
- query ,
254
- parameters ,
255
- { connectionHolder, onError, onComplete, reactive }
256
- ) => {
288
+ run : ( query , parameters , { connectionHolder, onError, onComplete } ) => {
257
289
return newCompletedResult (
258
290
new FailedObserver ( {
259
291
error : newError (
@@ -299,11 +331,7 @@ const _states = {
299
331
state : _states . SUCCEEDED
300
332
}
301
333
} ,
302
- run : (
303
- query ,
304
- parameters ,
305
- { connectionHolder, onError, onComplete, reactive }
306
- ) => {
334
+ run : ( query , parameters , { connectionHolder, onError, onComplete } ) => {
307
335
return newCompletedResult (
308
336
new FailedObserver ( {
309
337
error : newError (
@@ -348,11 +376,7 @@ const _states = {
348
376
state : _states . ROLLED_BACK
349
377
}
350
378
} ,
351
- run : (
352
- query ,
353
- parameters ,
354
- { connectionHolder, onError, onComplete, reactive }
355
- ) => {
379
+ run : ( query , parameters , { connectionHolder, onError, onComplete } ) => {
356
380
return newCompletedResult (
357
381
new FailedObserver ( {
358
382
error : newError (
@@ -373,18 +397,21 @@ const _states = {
373
397
* @param {ConnectionHolder } connectionHolder
374
398
* @param {function(err:Error): any } onError
375
399
* @param {function(metadata:object): any } onComplete
400
+ * @param {function() : any } onConnection
376
401
* @param {list<Result>> }pendingResults all run results in this transaction
377
402
*/
378
403
function finishTransaction (
379
404
commit ,
380
405
connectionHolder ,
381
406
onError ,
382
407
onComplete ,
408
+ onConnection ,
383
409
pendingResults
384
410
) {
385
411
const observerPromise = connectionHolder
386
412
. getConnection ( )
387
413
. then ( connection => {
414
+ onConnection ( )
388
415
pendingResults . forEach ( r => r . _cancel ( ) )
389
416
return Promise . all ( pendingResults ) . then ( results => {
390
417
if ( commit ) {
0 commit comments