Skip to content

Commit 35c82b3

Browse files
authored
Introduce Transaction.close method (#847)
This method closes the transaction and rolled it back if it stills open.
1 parent 6b48f15 commit 35c82b3

File tree

8 files changed

+192
-3
lines changed

8 files changed

+192
-3
lines changed

packages/core/src/transaction.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,19 @@ class Transaction {
220220
return this._state === _states.ACTIVE
221221
}
222222

223+
/**
224+
* Closes the transaction
225+
*
226+
* This method will roll back the transaction if it is not already committed or rolled back.
227+
*
228+
* @returns {Promise<void>} An empty promise if closed successfully or error if any error happened during
229+
*/
230+
async close(): Promise<void> {
231+
if (this.isOpen()) {
232+
await this.rollback()
233+
}
234+
}
235+
223236
_onErrorCallback(err: any): Promise<Connection | void> {
224237
// error will be "acknowledged" by sending a RESET message
225238
// database will then forget about this transaction and cleanup all corresponding resources

packages/core/test/transaction.test.ts

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,59 @@ describe('Transaction', () => {
6060

6161
})
6262

63+
describe('.close()', () => {
64+
describe('when transaction is open', () => {
65+
it('should roll back the transaction', async () => {
66+
const connection = newFakeConnection()
67+
const tx = newTransaction({ connection })
68+
69+
await tx.run('RETURN 1')
70+
await tx.close()
71+
72+
expect(connection.rollbackInvoked).toEqual(1)
73+
})
74+
75+
it('should surface errors during the rollback', async () => {
76+
const expectedError = new Error('rollback error')
77+
const connection = newFakeConnection().withRollbackError(expectedError)
78+
const tx = newTransaction({ connection })
79+
80+
await tx.run('RETURN 1')
81+
82+
try {
83+
await tx.close()
84+
fail('should have thrown')
85+
} catch (error) {
86+
expect(error).toEqual(expectedError)
87+
}
88+
})
89+
})
90+
91+
describe('when transaction is closed', () => {
92+
const commit = async (tx: Transaction) => tx.commit()
93+
const rollback = async (tx: Transaction) => tx.rollback()
94+
const error = async (tx: Transaction, conn: FakeConnection) => {
95+
conn.withRollbackError(new Error('rollback error'))
96+
return tx.rollback().catch(() => { })
97+
}
98+
99+
it.each([
100+
['commmited', commit],
101+
['rolled back', rollback],
102+
['with error', error]
103+
])('should not roll back the connection', async (_, operation) => {
104+
const connection = newFakeConnection()
105+
const tx = newTransaction({ connection })
106+
107+
await operation(tx, connection)
108+
const rollbackInvokedAfterOperation = connection.rollbackInvoked
109+
110+
await tx.close()
111+
112+
expect(connection.rollbackInvoked).toEqual(rollbackInvokedAfterOperation)
113+
})
114+
})
115+
})
63116
})
64117

65118
function newTransaction({
@@ -69,9 +122,9 @@ function newTransaction({
69122
lowRecordWatermark = 300
70123
}: {
71124
connection: FakeConnection
72-
fetchSize: number
73-
highRecordWatermark: number,
74-
lowRecordWatermark: number
125+
fetchSize?: number
126+
highRecordWatermark?: number,
127+
lowRecordWatermark?: number
75128
}): Transaction {
76129
const connectionProvider = new ConnectionProvider()
77130
connectionProvider.acquireConnection = () => Promise.resolve(connection)

packages/core/test/utils/connection.fake.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ export default class FakeConnection extends Connection {
4444
public protocolErrorsHandled: number
4545
public seenProtocolErrors: string[]
4646
public seenRequestRoutingInformation: any[]
47+
public rollbackInvoked: number
48+
public _rollbackError: Error | null
4749

4850
constructor() {
4951
super()
@@ -64,6 +66,8 @@ export default class FakeConnection extends Connection {
6466
this.protocolErrorsHandled = 0
6567
this.seenProtocolErrors = []
6668
this.seenRequestRoutingInformation = []
69+
this.rollbackInvoked = 0
70+
this._rollbackError = null
6771
}
6872

6973
get id(): string {
@@ -105,6 +109,13 @@ export default class FakeConnection extends Connection {
105109
beginTransaction: () => {
106110
return Promise.resolve()
107111
},
112+
rollbackTransaction: () => {
113+
this.rollbackInvoked ++
114+
if (this._rollbackError !== null) {
115+
return mockResultStreamObserverWithError('ROLLBACK', {}, this._rollbackError)
116+
}
117+
return mockResultStreamObserver('ROLLBACK', {})
118+
},
108119
requestRoutingInformation: (params: any | undefined) => {
109120
this.seenRequestRoutingInformation.push(params)
110121
if (this._requestRoutingInformationMock) {
@@ -161,12 +172,27 @@ export default class FakeConnection extends Connection {
161172
return this
162173
}
163174

175+
withRollbackError(error: Error) {
176+
this._rollbackError = error
177+
return this
178+
}
179+
164180
closed() {
165181
this._open = false
166182
return this
167183
}
168184
}
169185

186+
function mockResultStreamObserverWithError (query: string, parameters: any | undefined, error: Error) {
187+
const observer = mockResultStreamObserver(query, parameters)
188+
observer.subscribe = (observer: ResultObserver) => {
189+
if (observer && observer.onError) {
190+
observer.onError(error)
191+
}
192+
}
193+
return observer
194+
}
195+
170196
function mockResultStreamObserver(query: string, parameters: any | undefined): ResultStreamObserver {
171197
return {
172198
onError: (error: any) => { },

packages/neo4j-driver/src/transaction-rx.js

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,22 @@ export default class RxTransaction {
9090
.catch(err => observer.error(err))
9191
})
9292
}
93+
94+
/**
95+
* Closes the transaction
96+
*
97+
* This method will roll back the transaction if it is not already committed or rolled back.
98+
*
99+
* @returns {Observable} - An empty observable
100+
*/
101+
close () {
102+
return new Observable(observer => {
103+
this._txc
104+
.close()
105+
.then(() => {
106+
observer.complete()
107+
})
108+
.catch(err => observer.error(err))
109+
})
110+
}
93111
}

packages/neo4j-driver/test/rx/transaction.test.js

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
} from 'rxjs/operators'
3030
import neo4j from '../../src'
3131
import RxSession from '../../src/session-rx'
32+
import RxTransaction from '../../src/transaction-rx'
3233
import sharedNeo4j from '../internal/shared-neo4j'
3334
import { newError } from 'neo4j-driver-core'
3435

@@ -148,6 +149,35 @@ describe('#integration-rx transaction', () => {
148149
expect(await countNodes(42)).toBe(0)
149150
})
150151

152+
it('should run query and close', async () => {
153+
if (protocolVersion < 4.0) {
154+
return
155+
}
156+
157+
const result = await session
158+
.beginTransaction()
159+
.pipe(
160+
flatMap(txc =>
161+
txc
162+
.run('CREATE (n:Node {id: 42}) RETURN n')
163+
.records()
164+
.pipe(
165+
map(r => r.get('n').properties.id),
166+
concat(txc.close())
167+
)
168+
),
169+
materialize(),
170+
toArray()
171+
)
172+
.toPromise()
173+
expect(result).toEqual([
174+
Notification.createNext(neo4j.int(42)),
175+
Notification.createComplete()
176+
])
177+
178+
expect(await countNodes(42)).toBe(0)
179+
})
180+
151181
it('should run multiple queries and commit', async () => {
152182
await verifyCanRunMultipleQueries(true)
153183
})
@@ -720,3 +750,37 @@ describe('#integration-rx transaction', () => {
720750
.toPromise()
721751
}
722752
})
753+
754+
describe('#unit', () => {
755+
describe('.close()', () => {
756+
it('should delegate to the original Transaction', async () => {
757+
const txc = {
758+
close: jasmine.createSpy('close').and.returnValue(Promise.resolve())
759+
}
760+
761+
const transaction = new RxTransaction(txc)
762+
763+
await transaction.close().toPromise()
764+
765+
expect(txc.close).toHaveBeenCalled()
766+
})
767+
768+
it('should fail if to the original Transaction.close call fails', async () => {
769+
const expectedError = new Error('expected')
770+
const txc = {
771+
close: jasmine
772+
.createSpy('close')
773+
.and.returnValue(Promise.reject(expectedError))
774+
}
775+
776+
const transaction = new RxTransaction(txc)
777+
778+
try {
779+
await transaction.close().toPromise()
780+
fail('should have thrown')
781+
} catch (error) {
782+
expect(error).toBe(expectedError)
783+
}
784+
})
785+
})
786+
})

packages/neo4j-driver/test/types/transaction-rx.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,7 @@ tx.commit()
6868
tx.rollback()
6969
.pipe(concat(of('rolled back')))
7070
.subscribe(stringObserver)
71+
72+
tx.close()
73+
.pipe(concat(of('closed')))
74+
.subscribe(stringObserver)

packages/neo4j-driver/types/transaction-rx.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ declare interface RxTransaction {
2626
commit(): Observable<any>
2727

2828
rollback(): Observable<any>
29+
30+
close(): Observable<any>
2931
}
3032

3133
export default RxTransaction

packages/testkit-backend/src/request-handlers.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,14 @@ export function TransactionRollback (context, data, wire) {
310310
.catch(e => wire.writeError(e))
311311
}
312312

313+
export function TransactionClose (context, data, wire) {
314+
const { txId: id } = data
315+
const { tx } = context.getTx(id)
316+
return tx.close()
317+
.then(() => wire.writeResponse('Transaction', { id }))
318+
.catch(e => wire.writeError(e))
319+
}
320+
313321
export function SessionLastBookmarks (context, data, wire) {
314322
const { sessionId } = data
315323
const session = context.getSession(sessionId)
@@ -360,6 +368,7 @@ export function GetFeatures (_context, _params, wire) {
360368
'Feature:API:Result.List',
361369
'Feature:API:Result.Peek',
362370
'Temporary:ConnectionAcquisitionTimeout',
371+
'Temporary:TransactionClose',
363372
...SUPPORTED_TLS
364373
]
365374
})

0 commit comments

Comments
 (0)