Skip to content

Prevent iterate over already consumed Result #896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/core/src/result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import ResultSummary from './result-summary'
import Record from './record'
import { Query, PeekableAsyncIterator } from './types'
import { observer, util, connectionHolder } from './internal'
import { newError } from './error'

const { EMPTY_CONNECTION_HOLDER } = connectionHolder

Expand Down Expand Up @@ -238,6 +239,13 @@ class Result implements Promise<QueryResult> {
* @returns {PeekableAsyncIterator<Record, ResultSummary>} The async iterator for the Results
*/
[Symbol.asyncIterator](): PeekableAsyncIterator<Record, ResultSummary> {
if (!this.isOpen()) {
const error = newError('Result is already consumed')
return {
next: () => Promise.reject(error),
peek: () => Promise.reject(error),
}
}
const state: {
paused: boolean,
firstRun: boolean,
Expand Down Expand Up @@ -362,6 +370,14 @@ class Result implements Promise<QueryResult> {
.catch(() => {})
}

/**
* Check if this result is active, i.e., neither a summary nor an error has been received by the result.
* @return {boolean} `true` when neither a summary or nor an error has been received by the result.
*/
isOpen (): boolean {
return this._summary === null && this._error === null
}

/**
* Stream records to observer as they come in, this is a more efficient method
* of handling the results, and allows you to handle arbitrarily large results.
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ function finishTransaction(
.then(connection => {
onConnection()
pendingResults.forEach(r => r._cancel())
return Promise.all(pendingResults).then(results => {
return Promise.all(pendingResults.map(result => result.summary())).then(results => {
if (connection) {
if (commit) {
return connection.protocol().commitTransaction({
Expand Down
81 changes: 81 additions & 0 deletions packages/core/test/result.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,26 @@ describe('Result', () => {
])
})

it.each([
['success', async (stream: any) => stream.onCompleted({})],
['error', async (stream: any) => stream.onError(new Error('error'))],
])('should thrown on iterating over an consumed result [%s]', async(_, completeStream) => {
completeStream(streamObserverMock)

await result.summary().catch(() => {})

try {
for await (const _ of result) {
expect('not to iterate over consumed result').toBe(true)
}
expect('not to finish iteration over consumed result').toBe(true)
} catch (e) {
expect(e).toEqual(newError('Result is already consumed'))
}

expect('not to finish iteration over consumed result')
})

describe('.return()', () => {
it('should finished the operator when it get called', async () => {
const keys = ['a', 'b']
Expand Down Expand Up @@ -1205,6 +1225,30 @@ describe('Result', () => {
})
})
})

describe('.isOpen()', () => {
it('should return true when the stream is open', async () => {
await result._subscribe({}).catch(() => {})

expect(result.isOpen()).toBe(true)
})

it('should return false when the stream is closed', async () => {
streamObserverMock.onCompleted({})

await result._subscribe({}).catch(() => {})

expect(result.isOpen()).toBe(false)
})

it('should return false when the stream failed', async () => {
streamObserverMock.onError(new Error('test'))

await result._subscribe({}).catch(() => {})

expect(result.isOpen()).toBe(false)
})
})
})

describe.each([
Expand Down Expand Up @@ -1319,6 +1363,19 @@ describe('Result', () => {
})
})
})

describe('isOpen()', () => {
it('should be true', () => {
expect(result.isOpen()).toBe(true)
})

it('should be false after any interaction with the stream', async () => {
const it = result[Symbol.asyncIterator]()
await it.next()

expect(result.isOpen()).toBe(false)
})
})
})

describe.each([
Expand Down Expand Up @@ -1381,6 +1438,30 @@ describe('Result', () => {
})
})

describe('.isOpen()', () => {
it('should be true', async () => {
expect(result.isOpen()).toBe(true)

// consume the stream to avoid unhaddled errors
try {
await result.summary()
} catch (error) {
}
})

it('should be false after any interactio with the stream', async () => {
const it = result[Symbol.asyncIterator]()

try {
await it.next()
} catch (error) {
// this's fine
}

expect(result.isOpen()).toBe(false)
})
})

function shouldReturnRejectedPromiseWithTheExpectedError<T>(
supplier: () => Promise<T>
) {
Expand Down
75 changes: 50 additions & 25 deletions packages/neo4j-driver/test/rx/navigation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,19 @@ describe('#integration-rx navigation', () => {
60000
)

it(
'should fail on result when closed',
() =>
shouldFailOnResultWhenClosed(protocolVersion, session, () =>
session.close()
),
60000
)
getObservableSelectors().forEach(([observableName, observableSelector]) => {
it(
`${observableName} should fail on result when closed`,
() =>
shouldFailOnResultWhenClosed(
protocolVersion,
session,
observableSelector,
() => session.close()
),
60000
)
})
})

describe('transaction', () => {
Expand Down Expand Up @@ -340,21 +345,33 @@ describe('#integration-rx navigation', () => {
60000
)

it(
'should fail on result when committed',
() =>
shouldFailOnResultWhenClosed(protocolVersion, txc, () => txc.commit()),
60000
)
getObservableSelectors().forEach(([observableName, observableSelector]) => {
it(
`${observableName} should fail on result when committed`,
() =>
shouldFailOnResultWhenClosed(
protocolVersion,
txc,
observableSelector,
() => txc.commit()
),
60000
)
})

it(
'should fail on result when rolled back',
() =>
shouldFailOnResultWhenClosed(protocolVersion, txc, () =>
txc.rollback()
),
60000
)
getObservableSelectors().forEach(([observableName, observableSelector]) => {
it(
`${observableName}should fail on result when rolled back`,
() =>
shouldFailOnResultWhenClosed(
protocolVersion,
txc,
observableSelector,
() => txc.rollback()
),
60000
)
})
})

/**
Expand Down Expand Up @@ -762,14 +779,24 @@ describe('#integration-rx navigation', () => {
await collectAndAssertError(result.consume(), expectedError)
}

function getObservableSelectors () {
return [
['consume', r => r.consume()],
['keys', r => r.keys()],
['records', r => r.records()]
]
}

/**
* @param {number} protocolVersion
* @param {RxSession|RxTransaction} runnable
* @param {function(RxSession|RxTransaction):Observable<any>} selectObservable
* @param {function(): Observable} closeFunc
*/
async function shouldFailOnResultWhenClosed (
protocolVersion,
runnable,
selectObservable,
closeFunc
) {
if (protocolVersion < 4.0) {
Expand All @@ -782,9 +809,7 @@ describe('#integration-rx navigation', () => {
const expectedError = jasmine.objectContaining({
message: jasmine.stringMatching(/Cannot run query/)
})
await collectAndAssertError(result.keys(), expectedError)
await collectAndAssertError(result.records(), expectedError)
await collectAndAssertError(result.consume(), expectedError)
await collectAndAssertError(selectObservable(result), expectedError)
}

async function collectAndAssertKeys (result) {
Expand Down
2 changes: 1 addition & 1 deletion packages/testkit-backend/src/controller/local.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export default class LocalController extends Controller {
const id = this._contexts.get(contextId).addError(e)
this._writeResponse(contextId, newResponse('DriverError', {
id,
msg: e.message + ' (' + e.code + ')',
msg: e.message,
code: e.code
}))
}
Expand Down
4 changes: 0 additions & 4 deletions packages/testkit-backend/src/skipped-tests/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ const skippedTests = [
'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list'
)
),
skip(
'Results are always valid but don\'t return records when out of scope',
ifStartsWith('stub.iteration.test_result_scope.TestResultScope.')
),
skip(
'Driver (still) allows explicit managing of managed transaction',
ifEquals('stub.tx_lifetime.test_tx_lifetime.TestTxLifetime.test_managed_tx_raises_tx_managed_exec')
Expand Down