Skip to content

Commit 0726ae4

Browse files
committed
Wait for BEGIN be send before send RUN, COMMIT or ROLLBACK
1 parent c5adb79 commit 0726ae4

File tree

3 files changed

+141
-50
lines changed

3 files changed

+141
-50
lines changed

packages/core/src/internal/connection-holder.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class ConnectionHolder implements ConnectionHolderInterface {
8585
private readonly _impersonatedUser?: string
8686
private readonly _getConnectionAcquistionBookmarks: () => Promise<Bookmarks>
8787
private readonly _onDatabaseNameResolved?: (databaseName?: string) => void
88+
private _connectionError?: Error
8889

8990
/**
9091
* @constructor
@@ -123,6 +124,7 @@ class ConnectionHolder implements ConnectionHolderInterface {
123124
this._connectionPromise = Promise.resolve(null)
124125
this._onDatabaseNameResolved = onDatabaseNameResolved
125126
this._getConnectionAcquistionBookmarks = getConnectionAcquistionBookmarks ?? (() => Promise.resolve(Bookmarks.empty()))
127+
this._connectionError = undefined
126128
}
127129

128130
mode (): string | undefined {
@@ -161,12 +163,16 @@ class ConnectionHolder implements ConnectionHolderInterface {
161163
}
162164

163165
private async _createConnectionPromise (connectionProvider: ConnectionProvider): Promise<Connection | null> {
166+
this._connectionError = undefined
164167
return await connectionProvider.acquireConnection({
165168
accessMode: this._mode,
166169
database: this._database,
167170
bookmarks: await this._getBookmarks(),
168171
impersonatedUser: this._impersonatedUser,
169172
onDatabaseNameResolved: this._onDatabaseNameResolved
173+
}).catch(error => {
174+
this._connectionError = error
175+
throw error
170176
})
171177
}
172178

@@ -175,6 +181,13 @@ class ConnectionHolder implements ConnectionHolderInterface {
175181
}
176182

177183
getConnection (): Promise<Connection | null> {
184+
// Failures in the connection promise should be
185+
// propagated to the all the .getConnection()
186+
// calls util a new connection promise get
187+
// set up
188+
if (this._connectionError !== undefined) {
189+
return Promise.reject(this._connectionError)
190+
}
178191
return this._connectionPromise
179192
}
180193

packages/core/src/transaction.ts

Lines changed: 74 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ class Transaction {
5858
private readonly _lowRecordWatermak: number
5959
private readonly _highRecordWatermark: number
6060
private _bookmarks: Bookmarks
61+
private readonly _activePromise: Promise<void>
62+
private _acceptActive: () => void
6163

6264
/**
6365
* @constructor
@@ -107,6 +109,9 @@ class Transaction {
107109
this._lowRecordWatermak = lowRecordWatermark
108110
this._highRecordWatermark = highRecordWatermark
109111
this._bookmarks = Bookmarks.empty()
112+
this._activePromise = new Promise((resolve, reject) => {
113+
this._acceptActive = resolve
114+
})
110115
}
111116

112117
/**
@@ -154,6 +159,10 @@ class Transaction {
154159
}
155160
this._onError(error).catch(() => {})
156161
})
162+
// It should make the transaction active anyway
163+
// futher errors will be treated by the exiting
164+
// observers
165+
.finally(() => this._acceptActive())
157166
}
158167

159168
/**
@@ -178,7 +187,8 @@ class Transaction {
178187
reactive: this._reactive,
179188
fetchSize: this._fetchSize,
180189
highRecordWatermark: this._highRecordWatermark,
181-
lowRecordWatermark: this._lowRecordWatermak
190+
lowRecordWatermark: this._lowRecordWatermak,
191+
preparationJob: this._activePromise
182192
})
183193
this._results.push(result)
184194
return result
@@ -197,7 +207,8 @@ class Transaction {
197207
onError: this._onError,
198208
onComplete: (meta: any) => this._onCompleteCallback(meta, this._bookmarks),
199209
onConnection: this._onConnection,
200-
pendingResults: this._results
210+
pendingResults: this._results,
211+
preparationJob: this._activePromise
201212
})
202213
this._state = committed.state
203214
// clean up
@@ -224,7 +235,8 @@ class Transaction {
224235
onError: this._onError,
225236
onComplete: this._onComplete,
226237
onConnection: this._onConnection,
227-
pendingResults: this._results
238+
pendingResults: this._results,
239+
preparationJob: this._activePromise
228240
})
229241
this._state = rolledback.state
230242
// clean up
@@ -293,6 +305,7 @@ interface StateTransitionParams {
293305
fetchSize: number
294306
highRecordWatermark: number
295307
lowRecordWatermark: number
308+
preparationJob?: Promise<any>
296309
}
297310

298311
const _states = {
@@ -303,7 +316,8 @@ const _states = {
303316
onError,
304317
onComplete,
305318
onConnection,
306-
pendingResults
319+
pendingResults,
320+
preparationJob
307321
}: StateTransitionParams): any => {
308322
return {
309323
result: finishTransaction(
@@ -312,7 +326,8 @@ const _states = {
312326
onError,
313327
onComplete,
314328
onConnection,
315-
pendingResults
329+
pendingResults,
330+
preparationJob
316331
),
317332
state: _states.SUCCEEDED
318333
}
@@ -322,7 +337,8 @@ const _states = {
322337
onError,
323338
onComplete,
324339
onConnection,
325-
pendingResults
340+
pendingResults,
341+
preparationJob
326342
}: StateTransitionParams): any => {
327343
return {
328344
result: finishTransaction(
@@ -331,7 +347,8 @@ const _states = {
331347
onError,
332348
onComplete,
333349
onConnection,
334-
pendingResults
350+
pendingResults,
351+
preparationJob
335352
),
336353
state: _states.ROLLED_BACK
337354
}
@@ -347,31 +364,34 @@ const _states = {
347364
reactive,
348365
fetchSize,
349366
highRecordWatermark,
350-
lowRecordWatermark
367+
lowRecordWatermark,
368+
preparationJob
351369
}: StateTransitionParams
352370
): any => {
353371
// RUN in explicit transaction can't contain bookmarks and transaction configuration
354372
// No need to include mode and database name as it shall be inclued in begin
355-
const observerPromise = connectionHolder
356-
.getConnection()
357-
.then(conn => {
358-
onConnection()
359-
if (conn != null) {
360-
return conn.protocol().run(query, parameters, {
361-
bookmarks: Bookmarks.empty(),
362-
txConfig: TxConfig.empty(),
363-
beforeError: onError,
364-
afterComplete: onComplete,
365-
reactive: reactive,
366-
fetchSize: fetchSize,
367-
highRecordWatermark: highRecordWatermark,
368-
lowRecordWatermark: lowRecordWatermark
369-
})
370-
} else {
371-
throw newError('No connection available')
372-
}
373-
})
374-
.catch(error => new FailedObserver({ error, onError }))
373+
const requirements = preparationJob ?? Promise.resolve()
374+
375+
const observerPromise =
376+
requirements.then(() => connectionHolder.getConnection())
377+
.then(conn => {
378+
onConnection()
379+
if (conn != null) {
380+
return conn.protocol().run(query, parameters, {
381+
bookmarks: Bookmarks.empty(),
382+
txConfig: TxConfig.empty(),
383+
beforeError: onError,
384+
afterComplete: onComplete,
385+
reactive: reactive,
386+
fetchSize: fetchSize,
387+
highRecordWatermark: highRecordWatermark,
388+
lowRecordWatermark: lowRecordWatermark
389+
})
390+
} else {
391+
throw newError('No connection available')
392+
}
393+
})
394+
.catch(error => new FailedObserver({ error, onError }))
375395

376396
return newCompletedResult(
377397
observerPromise,
@@ -598,32 +618,36 @@ function finishTransaction (
598618
onError: (err: Error) => any,
599619
onComplete: (metadata: any) => any,
600620
onConnection: () => any,
601-
pendingResults: Result[]
621+
pendingResults: Result[],
622+
preparationJob?: Promise<void>
602623
): Result {
603-
const observerPromise = connectionHolder
604-
.getConnection()
605-
.then(connection => {
606-
onConnection()
607-
pendingResults.forEach(r => r._cancel())
608-
return Promise.all(pendingResults.map(result => result.summary())).then(results => {
609-
if (connection != null) {
610-
if (commit) {
611-
return connection.protocol().commitTransaction({
612-
beforeError: onError,
613-
afterComplete: onComplete
614-
})
624+
const requirements = preparationJob ?? Promise.resolve()
625+
626+
const observerPromise =
627+
requirements
628+
.then(() => connectionHolder.getConnection())
629+
.then(connection => {
630+
onConnection()
631+
pendingResults.forEach(r => r._cancel())
632+
return Promise.all(pendingResults.map(result => result.summary())).then(results => {
633+
if (connection != null) {
634+
if (commit) {
635+
return connection.protocol().commitTransaction({
636+
beforeError: onError,
637+
afterComplete: onComplete
638+
})
639+
} else {
640+
return connection.protocol().rollbackTransaction({
641+
beforeError: onError,
642+
afterComplete: onComplete
643+
})
644+
}
615645
} else {
616-
return connection.protocol().rollbackTransaction({
617-
beforeError: onError,
618-
afterComplete: onComplete
619-
})
646+
throw newError('No connection available')
620647
}
621-
} else {
622-
throw newError('No connection available')
623-
}
648+
})
624649
})
625-
})
626-
.catch(error => new FailedObserver({ error, onError }))
650+
.catch(error => new FailedObserver({ error, onError }))
627651

628652
// for commit & rollback we need result that uses real connection holder and notifies it when
629653
// connection is not needed and can be safely released to the pool

packages/core/test/transaction.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,8 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
325325
lowRecordWatermark: 300
326326
})
327327

328+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
329+
328330
await tx.run('RETURN 1')
329331

330332
expect(connection.seenProtocolOptions[0]).toMatchObject({
@@ -343,11 +345,36 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
343345
lowRecordWatermark: 300
344346
})
345347

348+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
349+
346350
const result = tx.run('RETURN 1')
347351

348352
// @ts-expect-error
349353
expect(result._watermarks).toEqual({ high: 700, low: 300 })
350354
})
355+
356+
it('should wait begin message be send', async () => {
357+
const connection = newFakeConnection()
358+
const tx = newTransaction({
359+
connection
360+
})
361+
362+
const bookmarksPromise: Promise<Bookmarks> = new Promise((resolve) => {
363+
setTimeout(() => resolve(Bookmarks.empty()), 1000)
364+
})
365+
366+
tx._begin(async () => await bookmarksPromise, TxConfig.empty())
367+
368+
const result = tx.run('RETURN 1')
369+
370+
expect(connection.seenBeginTransaction.length).toEqual(0)
371+
expect(connection.seenQueries.length).toEqual(0)
372+
373+
await result
374+
375+
expect(connection.seenBeginTransaction.length).toEqual(1)
376+
expect(connection.seenQueries.length).toEqual(1)
377+
})
351378
})
352379

353380
describe('.close()', () => {
@@ -356,6 +383,7 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
356383
const connection = newFakeConnection()
357384
const tx = newTransaction({ connection })
358385

386+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
359387
await tx.run('RETURN 1')
360388
await tx.close()
361389

@@ -367,6 +395,7 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
367395
const connection = newFakeConnection().withRollbackError(expectedError)
368396
const tx = newTransaction({ connection })
369397

398+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
370399
await tx.run('RETURN 1')
371400

372401
try {
@@ -376,6 +405,29 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
376405
expect(error).toEqual(expectedError)
377406
}
378407
})
408+
409+
it('should wait begin message be send', async () => {
410+
const connection = newFakeConnection()
411+
const tx = newTransaction({
412+
connection
413+
})
414+
415+
const bookmarksPromise: Promise<Bookmarks> = new Promise((resolve) => {
416+
setTimeout(() => resolve(Bookmarks.empty()), 1000)
417+
})
418+
419+
tx._begin(async () => await bookmarksPromise, TxConfig.empty())
420+
421+
const result = tx.close()
422+
423+
expect(connection.seenBeginTransaction.length).toEqual(0)
424+
expect(connection.rollbackInvoked).toEqual(0)
425+
426+
await result
427+
428+
expect(connection.seenBeginTransaction.length).toEqual(1)
429+
expect(connection.rollbackInvoked).toEqual(1)
430+
})
379431
})
380432

381433
describe('when transaction is closed', () => {
@@ -394,6 +446,8 @@ function testTx<T extends Transaction> (transactionName: string, newTransaction:
394446
const connection = newFakeConnection()
395447
const tx = newTransaction({ connection })
396448

449+
tx._begin(async () => Bookmarks.empty(), TxConfig.empty())
450+
397451
await operation(tx, connection)
398452
const rollbackInvokedAfterOperation = connection.rollbackInvoked
399453

0 commit comments

Comments
 (0)