Skip to content

Commit 7af8730

Browse files
committed
Wait for BEGIN be send before send RUN, COMMIT or ROLLBACK
1 parent c5adb79 commit 7af8730

File tree

2 files changed

+129
-50
lines changed

2 files changed

+129
-50
lines changed

packages/core/src/transaction.ts

Lines changed: 75 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ class Transaction {
5858
private readonly _lowRecordWatermak: number
5959
private readonly _highRecordWatermark: number
6060
private _bookmarks: Bookmarks
61+
private readonly _activePromise: Promise<void>
62+
private _acceptActive: () => void
6163

6264
/**
6365
* @constructor
@@ -107,6 +109,9 @@ class Transaction {
107109
this._lowRecordWatermak = lowRecordWatermark
108110
this._highRecordWatermark = highRecordWatermark
109111
this._bookmarks = Bookmarks.empty()
112+
this._activePromise = new Promise((resolve, reject) => {
113+
this._acceptActive = resolve
114+
})
110115
}
111116

112117
/**
@@ -154,6 +159,10 @@ class Transaction {
154159
}
155160
this._onError(error).catch(() => {})
156161
})
162+
// It should make the transaction active anyway
163+
// futher errors will be treated by the exiting
164+
// observers
165+
.finally(() => this._acceptActive())
157166
}
158167

159168
/**
@@ -178,7 +187,8 @@ class Transaction {
178187
reactive: this._reactive,
179188
fetchSize: this._fetchSize,
180189
highRecordWatermark: this._highRecordWatermark,
181-
lowRecordWatermark: this._lowRecordWatermak
190+
lowRecordWatermark: this._lowRecordWatermak,
191+
preparationJob: this._activePromise
182192
})
183193
this._results.push(result)
184194
return result
@@ -197,7 +207,8 @@ class Transaction {
197207
onError: this._onError,
198208
onComplete: (meta: any) => this._onCompleteCallback(meta, this._bookmarks),
199209
onConnection: this._onConnection,
200-
pendingResults: this._results
210+
pendingResults: this._results,
211+
preparationJob: this._activePromise
201212
})
202213
this._state = committed.state
203214
// clean up
@@ -224,7 +235,8 @@ class Transaction {
224235
onError: this._onError,
225236
onComplete: this._onComplete,
226237
onConnection: this._onConnection,
227-
pendingResults: this._results
238+
pendingResults: this._results,
239+
preparationJob: this._activePromise
228240
})
229241
this._state = rolledback.state
230242
// clean up
@@ -293,6 +305,7 @@ interface StateTransitionParams {
293305
fetchSize: number
294306
highRecordWatermark: number
295307
lowRecordWatermark: number
308+
preparationJob?: Promise<any>
296309
}
297310

298311
const _states = {
@@ -303,7 +316,8 @@ const _states = {
303316
onError,
304317
onComplete,
305318
onConnection,
306-
pendingResults
319+
pendingResults,
320+
preparationJob
307321
}: StateTransitionParams): any => {
308322
return {
309323
result: finishTransaction(
@@ -312,7 +326,8 @@ const _states = {
312326
onError,
313327
onComplete,
314328
onConnection,
315-
pendingResults
329+
pendingResults,
330+
preparationJob
316331
),
317332
state: _states.SUCCEEDED
318333
}
@@ -322,7 +337,8 @@ const _states = {
322337
onError,
323338
onComplete,
324339
onConnection,
325-
pendingResults
340+
pendingResults,
341+
preparationJob
326342
}: StateTransitionParams): any => {
327343
return {
328344
result: finishTransaction(
@@ -331,7 +347,8 @@ const _states = {
331347
onError,
332348
onComplete,
333349
onConnection,
334-
pendingResults
350+
pendingResults,
351+
preparationJob
335352
),
336353
state: _states.ROLLED_BACK
337354
}
@@ -347,31 +364,35 @@ const _states = {
347364
reactive,
348365
fetchSize,
349366
highRecordWatermark,
350-
lowRecordWatermark
367+
lowRecordWatermark,
368+
preparationJob
351369
}: StateTransitionParams
352370
): any => {
353371
// RUN in explicit transaction can't contain bookmarks and transaction configuration
354372
// No need to include mode and database name as it shall be inclued in begin
355-
const observerPromise = connectionHolder
356-
.getConnection()
357-
.then(conn => {
358-
onConnection()
359-
if (conn != null) {
360-
return conn.protocol().run(query, parameters, {
361-
bookmarks: Bookmarks.empty(),
362-
txConfig: TxConfig.empty(),
363-
beforeError: onError,
364-
afterComplete: onComplete,
365-
reactive: reactive,
366-
fetchSize: fetchSize,
367-
highRecordWatermark: highRecordWatermark,
368-
lowRecordWatermark: lowRecordWatermark
369-
})
370-
} else {
371-
throw newError('No connection available')
372-
}
373-
})
374-
.catch(error => new FailedObserver({ error, onError }))
373+
const requirements = preparationJob ?? Promise.resolve()
374+
375+
const observerPromise =
376+
connectionHolder.getConnection()
377+
.then(conn => requirements.then(() => conn))
378+
.then(conn => {
379+
onConnection()
380+
if (conn != null) {
381+
return conn.protocol().run(query, parameters, {
382+
bookmarks: Bookmarks.empty(),
383+
txConfig: TxConfig.empty(),
384+
beforeError: onError,
385+
afterComplete: onComplete,
386+
reactive: reactive,
387+
fetchSize: fetchSize,
388+
highRecordWatermark: highRecordWatermark,
389+
lowRecordWatermark: lowRecordWatermark
390+
})
391+
} else {
392+
throw newError('No connection available')
393+
}
394+
})
395+
.catch(error => new FailedObserver({ error, onError }))
375396

376397
return newCompletedResult(
377398
observerPromise,
@@ -598,32 +619,36 @@ function finishTransaction (
598619
onError: (err: Error) => any,
599620
onComplete: (metadata: any) => any,
600621
onConnection: () => any,
601-
pendingResults: Result[]
622+
pendingResults: Result[],
623+
preparationJob?: Promise<void>
602624
): Result {
603-
const observerPromise = connectionHolder
604-
.getConnection()
605-
.then(connection => {
606-
onConnection()
607-
pendingResults.forEach(r => r._cancel())
608-
return Promise.all(pendingResults.map(result => result.summary())).then(results => {
609-
if (connection != null) {
610-
if (commit) {
611-
return connection.protocol().commitTransaction({
612-
beforeError: onError,
613-
afterComplete: onComplete
614-
})
625+
const requirements = preparationJob ?? Promise.resolve()
626+
627+
const observerPromise =
628+
connectionHolder.getConnection()
629+
.then(conn => requirements.then(() => conn))
630+
.then(connection => {
631+
onConnection()
632+
pendingResults.forEach(r => r._cancel())
633+
return Promise.all(pendingResults.map(result => result.summary())).then(results => {
634+
if (connection != null) {
635+
if (commit) {
636+
return connection.protocol().commitTransaction({
637+
beforeError: onError,
638+
afterComplete: onComplete
639+
})
640+
} else {
641+
return connection.protocol().rollbackTransaction({
642+
beforeError: onError,
643+
afterComplete: onComplete
644+
})
645+
}
615646
} else {
616-
return connection.protocol().rollbackTransaction({
617-
beforeError: onError,
618-
afterComplete: onComplete
619-
})
647+
throw newError('No connection available')
620648
}
621-
} else {
622-
throw newError('No connection available')
623-
}
649+
})
624650
})
625-
})
626-
.catch(error => new FailedObserver({ error, onError }))
651+
.catch(error => new FailedObserver({ error, onError }))
627652

628653
// for commit & rollback we need result that uses real connection holder and notifies it when
629654
// connection is not needed and can be safely released to the pool

packages/core/test/transaction.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
325325
lowRecordWatermark: 300
326326
})
327327

328+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
329+
328330
await tx.run('RETURN 1')
329331

330332
expect(connection.seenProtocolOptions[0]).toMatchObject({
@@ -343,11 +345,36 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
343345
lowRecordWatermark: 300
344346
})
345347

348+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
349+
346350
const result = tx.run('RETURN 1')
347351

348352
// @ts-expect-error
349353
expect(result._watermarks).toEqual({ high: 700, low: 300 })
350354
})
355+
356+
it('should wait begin message be send', async () => {
357+
const connection = newFakeConnection()
358+
const tx = newTransaction({
359+
connection
360+
})
361+
362+
const bookmarksPromise: Promise<Bookmarks> = new Promise((resolve) => {
363+
setTimeout(() => resolve(Bookmarks.empty()), 1000)
364+
})
365+
366+
tx._begin(async () => await bookmarksPromise, TxConfig.empty())
367+
368+
const result = tx.run('RETURN 1')
369+
370+
expect(connection.seenBeginTransaction.length).toEqual(0)
371+
expect(connection.seenQueries.length).toEqual(0)
372+
373+
await result
374+
375+
expect(connection.seenBeginTransaction.length).toEqual(1)
376+
expect(connection.seenQueries.length).toEqual(1)
377+
})
351378
})
352379

353380
describe('.close()', () => {
@@ -356,6 +383,7 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
356383
const connection = newFakeConnection()
357384
const tx = newTransaction({ connection })
358385

386+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
359387
await tx.run('RETURN 1')
360388
await tx.close()
361389

@@ -367,6 +395,7 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
367395
const connection = newFakeConnection().withRollbackError(expectedError)
368396
const tx = newTransaction({ connection })
369397

398+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
370399
await tx.run('RETURN 1')
371400

372401
try {
@@ -376,6 +405,29 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
376405
expect(error).toEqual(expectedError)
377406
}
378407
})
408+
409+
it('should wait begin message be send', async () => {
410+
const connection = newFakeConnection()
411+
const tx = newTransaction({
412+
connection
413+
})
414+
415+
const bookmarksPromise: Promise<Bookmarks> = new Promise((resolve) => {
416+
setTimeout(() => resolve(Bookmarks.empty()), 1000)
417+
})
418+
419+
tx._begin(async () => await bookmarksPromise, TxConfig.empty())
420+
421+
const result = tx.close()
422+
423+
expect(connection.seenBeginTransaction.length).toEqual(0)
424+
expect(connection.rollbackInvoked).toEqual(0)
425+
426+
await result
427+
428+
expect(connection.seenBeginTransaction.length).toEqual(1)
429+
expect(connection.rollbackInvoked).toEqual(1)
430+
})
379431
})
380432

381433
describe('when transaction is closed', () => {
@@ -394,6 +446,8 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
394446
const connection = newFakeConnection()
395447
const tx = newTransaction({ connection })
396448

449+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
450+
397451
await operation(tx, connection)
398452
const rollbackInvokedAfterOperation = connection.rollbackInvoked
399453

0 commit comments

Comments
 (0)