Skip to content

Commit 8696921

Browse files
bigmontzrobsdedude
andauthored
Prevent iterate over already consumed Result (#896)
`for await (const r of result)` in already consumed results could block for ever since no new event will came from wire. Blocking this kind of access improves not correct access to the Result object. This change also includes the addition of `Result.isOpen()` Co-authored-by: Robsdedude <dev@rouvenbauer.de>
1 parent 88c07fd commit 8696921

File tree

6 files changed

+149
-31
lines changed

6 files changed

+149
-31
lines changed

packages/core/src/result.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import ResultSummary from './result-summary'
2121
import Record from './record'
2222
import { Query, PeekableAsyncIterator } from './types'
2323
import { observer, util, connectionHolder } from './internal'
24+
import { newError } from './error'
2425

2526
const { EMPTY_CONNECTION_HOLDER } = connectionHolder
2627

@@ -238,6 +239,13 @@ class Result implements Promise<QueryResult> {
238239
* @returns {PeekableAsyncIterator<Record, ResultSummary>} The async iterator for the Results
239240
*/
240241
[Symbol.asyncIterator](): PeekableAsyncIterator<Record, ResultSummary> {
242+
if (!this.isOpen()) {
243+
const error = newError('Result is already consumed')
244+
return {
245+
next: () => Promise.reject(error),
246+
peek: () => Promise.reject(error),
247+
}
248+
}
241249
const state: {
242250
paused: boolean,
243251
firstRun: boolean,
@@ -362,6 +370,14 @@ class Result implements Promise<QueryResult> {
362370
.catch(() => {})
363371
}
364372

373+
/**
374+
* Check if this result is active, i.e., neither a summary nor an error has been received by the result.
375+
* @return {boolean} `true` when neither a summary or nor an error has been received by the result.
376+
*/
377+
isOpen (): boolean {
378+
return this._summary === null && this._error === null
379+
}
380+
365381
/**
366382
* Stream records to observer as they come in, this is a more efficient method
367383
* of handling the results, and allows you to handle arbitrarily large results.

packages/core/src/transaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ function finishTransaction(
600600
.then(connection => {
601601
onConnection()
602602
pendingResults.forEach(r => r._cancel())
603-
return Promise.all(pendingResults).then(results => {
603+
return Promise.all(pendingResults.map(result => result.summary())).then(results => {
604604
if (connection) {
605605
if (commit) {
606606
return connection.protocol().commitTransaction({

packages/core/test/result.test.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -860,6 +860,26 @@ describe('Result', () => {
860860
])
861861
})
862862

863+
it.each([
864+
['success', async (stream: any) => stream.onCompleted({})],
865+
['error', async (stream: any) => stream.onError(new Error('error'))],
866+
])('should thrown on iterating over an consumed result [%s]', async(_, completeStream) => {
867+
completeStream(streamObserverMock)
868+
869+
await result.summary().catch(() => {})
870+
871+
try {
872+
for await (const _ of result) {
873+
expect('not to iterate over consumed result').toBe(true)
874+
}
875+
expect('not to finish iteration over consumed result').toBe(true)
876+
} catch (e) {
877+
expect(e).toEqual(newError('Result is already consumed'))
878+
}
879+
880+
expect('not to finish iteration over consumed result')
881+
})
882+
863883
describe('.return()', () => {
864884
it('should finished the operator when it get called', async () => {
865885
const keys = ['a', 'b']
@@ -1205,6 +1225,30 @@ describe('Result', () => {
12051225
})
12061226
})
12071227
})
1228+
1229+
describe('.isOpen()', () => {
1230+
it('should return true when the stream is open', async () => {
1231+
await result._subscribe({}).catch(() => {})
1232+
1233+
expect(result.isOpen()).toBe(true)
1234+
})
1235+
1236+
it('should return false when the stream is closed', async () => {
1237+
streamObserverMock.onCompleted({})
1238+
1239+
await result._subscribe({}).catch(() => {})
1240+
1241+
expect(result.isOpen()).toBe(false)
1242+
})
1243+
1244+
it('should return false when the stream failed', async () => {
1245+
streamObserverMock.onError(new Error('test'))
1246+
1247+
await result._subscribe({}).catch(() => {})
1248+
1249+
expect(result.isOpen()).toBe(false)
1250+
})
1251+
})
12081252
})
12091253

12101254
describe.each([
@@ -1319,6 +1363,19 @@ describe('Result', () => {
13191363
})
13201364
})
13211365
})
1366+
1367+
describe('isOpen()', () => {
1368+
it('should be true', () => {
1369+
expect(result.isOpen()).toBe(true)
1370+
})
1371+
1372+
it('should be false after any interaction with the stream', async () => {
1373+
const it = result[Symbol.asyncIterator]()
1374+
await it.next()
1375+
1376+
expect(result.isOpen()).toBe(false)
1377+
})
1378+
})
13221379
})
13231380

13241381
describe.each([
@@ -1381,6 +1438,30 @@ describe('Result', () => {
13811438
})
13821439
})
13831440

1441+
describe('.isOpen()', () => {
1442+
it('should be true', async () => {
1443+
expect(result.isOpen()).toBe(true)
1444+
1445+
// consume the stream to avoid unhaddled errors
1446+
try {
1447+
await result.summary()
1448+
} catch (error) {
1449+
}
1450+
})
1451+
1452+
it('should be false after any interactio with the stream', async () => {
1453+
const it = result[Symbol.asyncIterator]()
1454+
1455+
try {
1456+
await it.next()
1457+
} catch (error) {
1458+
// this's fine
1459+
}
1460+
1461+
expect(result.isOpen()).toBe(false)
1462+
})
1463+
})
1464+
13841465
function shouldReturnRejectedPromiseWithTheExpectedError<T>(
13851466
supplier: () => Promise<T>
13861467
) {

packages/neo4j-driver/test/rx/navigation.test.js

Lines changed: 50 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,19 @@ describe('#integration-rx navigation', () => {
170170
60000
171171
)
172172

173-
it(
174-
'should fail on result when closed',
175-
() =>
176-
shouldFailOnResultWhenClosed(protocolVersion, session, () =>
177-
session.close()
178-
),
179-
60000
180-
)
173+
getObservableSelectors().forEach(([observableName, observableSelector]) => {
174+
it(
175+
`${observableName} should fail on result when closed`,
176+
() =>
177+
shouldFailOnResultWhenClosed(
178+
protocolVersion,
179+
session,
180+
observableSelector,
181+
() => session.close()
182+
),
183+
60000
184+
)
185+
})
181186
})
182187

183188
describe('transaction', () => {
@@ -340,21 +345,33 @@ describe('#integration-rx navigation', () => {
340345
60000
341346
)
342347

343-
it(
344-
'should fail on result when committed',
345-
() =>
346-
shouldFailOnResultWhenClosed(protocolVersion, txc, () => txc.commit()),
347-
60000
348-
)
348+
getObservableSelectors().forEach(([observableName, observableSelector]) => {
349+
it(
350+
`${observableName} should fail on result when committed`,
351+
() =>
352+
shouldFailOnResultWhenClosed(
353+
protocolVersion,
354+
txc,
355+
observableSelector,
356+
() => txc.commit()
357+
),
358+
60000
359+
)
360+
})
349361

350-
it(
351-
'should fail on result when rolled back',
352-
() =>
353-
shouldFailOnResultWhenClosed(protocolVersion, txc, () =>
354-
txc.rollback()
355-
),
356-
60000
357-
)
362+
getObservableSelectors().forEach(([observableName, observableSelector]) => {
363+
it(
364+
`${observableName}should fail on result when rolled back`,
365+
() =>
366+
shouldFailOnResultWhenClosed(
367+
protocolVersion,
368+
txc,
369+
observableSelector,
370+
() => txc.rollback()
371+
),
372+
60000
373+
)
374+
})
358375
})
359376

360377
/**
@@ -762,14 +779,24 @@ describe('#integration-rx navigation', () => {
762779
await collectAndAssertError(result.consume(), expectedError)
763780
}
764781

782+
function getObservableSelectors () {
783+
return [
784+
['consume', r => r.consume()],
785+
['keys', r => r.keys()],
786+
['records', r => r.records()]
787+
]
788+
}
789+
765790
/**
766791
* @param {number} protocolVersion
767792
* @param {RxSession|RxTransaction} runnable
793+
* @param {function(RxSession|RxTransaction):Observable<any>} selectObservable
768794
* @param {function(): Observable} closeFunc
769795
*/
770796
async function shouldFailOnResultWhenClosed (
771797
protocolVersion,
772798
runnable,
799+
selectObservable,
773800
closeFunc
774801
) {
775802
if (protocolVersion < 4.0) {
@@ -782,9 +809,7 @@ describe('#integration-rx navigation', () => {
782809
const expectedError = jasmine.objectContaining({
783810
message: jasmine.stringMatching(/Cannot run query/)
784811
})
785-
await collectAndAssertError(result.keys(), expectedError)
786-
await collectAndAssertError(result.records(), expectedError)
787-
await collectAndAssertError(result.consume(), expectedError)
812+
await collectAndAssertError(selectObservable(result), expectedError)
788813
}
789814

790815
async function collectAndAssertKeys (result) {

packages/testkit-backend/src/controller/local.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export default class LocalController extends Controller {
6363
const id = this._contexts.get(contextId).addError(e)
6464
this._writeResponse(contextId, newResponse('DriverError', {
6565
id,
66-
msg: e.message + ' (' + e.code + ')',
66+
msg: e.message,
6767
code: e.code
6868
}))
6969
}

packages/testkit-backend/src/skipped-tests/common.js

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,6 @@ const skippedTests = [
138138
'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list'
139139
)
140140
),
141-
skip(
142-
'Results are always valid but don\'t return records when out of scope',
143-
ifStartsWith('stub.iteration.test_result_scope.TestResultScope.')
144-
),
145141
skip(
146142
'Driver (still) allows explicit managing of managed transaction',
147143
ifEquals('stub.tx_lifetime.test_tx_lifetime.TestTxLifetime.test_managed_tx_raises_tx_managed_exec')

0 commit comments

Comments
 (0)