Skip to content

AsyncIterator API for Result consumption #831

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e1098f4
AsynIterator API for Result consumption
bigmontz Aug 4, 2021
353675c
Improving interface
bigmontz Aug 5, 2021
ed73ca5
Add usage example for AsyncIterators
bigmontz Aug 5, 2021
ac9375d
Skipping more tests
bigmontz Dec 28, 2021
18943bc
Implementing pooling backpressure
bigmontz Dec 28, 2021
3334077
Add backpressure mecanics
bigmontz Dec 28, 2021
674724d
Remove ResultObserver from testkit-backend since it's not needed
bigmontz Dec 29, 2021
4ad01f2
Fix typo
bigmontz Dec 29, 2021
f924d6c
Stream only when it ready for it
bigmontz Dec 29, 2021
3d4832b
Fixing summary consumption.
bigmontz Dec 29, 2021
be5c442
Don't stream in ready streaming since a pull was already sent
bigmontz Dec 29, 2021
6383c31
Skipping tests which were working by accident in the last implementation
bigmontz Dec 30, 2021
8c25b11
Pulling data before and after the promise get solved
bigmontz Dec 30, 2021
15a9d40
Removing unneeded properties and move watermark logic to the session
bigmontz Jan 10, 2022
3dffb32
Add tests to ResultStreamObserver.pull() and change StreamObserver.se…
bigmontz Jan 11, 2022
49a3c90
Test Result async iterator consumption with watermark control
bigmontz Jan 11, 2022
7cd5f12
Moving Session unit tests to core and fix connection interface
bigmontz Jan 11, 2022
453521a
Test watermark treatment in the Session
bigmontz Jan 11, 2022
9c5666f
Add tests to Transaction
bigmontz Jan 11, 2022
fca75c4
Add tests in the bolt-protocol level
bigmontz Jan 11, 2022
89c3dac
Add docs and improve code design
bigmontz Jan 12, 2022
c9f1f8b
Adjusting DenoJS compatibility
bigmontz Jan 14, 2022
449e515
Checking if return the promise helps with the timeouts
bigmontz Jan 19, 2022
d8f4ca4
Emulating error in a unit test scenario
bigmontz Jan 24, 2022
1078485
Changing `pull` for `pause` and `resume`
bigmontz Jan 24, 2022
bc9e62b
Treat unhandled promise rejections error
bigmontz Jan 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions packages/bolt-connection/src/bolt/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ export default class BoltProtocol {
afterError,
beforeComplete,
afterComplete,
flush = true
flush = true,
highRecordWatermark = Number.MAX_VALUE,
lowRecordWatermark = Number.MAX_VALUE
} = {}
) {
const observer = new ResultStreamObserver({
Expand All @@ -288,7 +290,9 @@ export default class BoltProtocol {
beforeError,
afterError,
beforeComplete,
afterComplete
afterComplete,
highRecordWatermark,
lowRecordWatermark
})

// bookmark and mode are ignored in this version of the protocol
Expand Down
8 changes: 6 additions & 2 deletions packages/bolt-connection/src/bolt/bolt-protocol-v3.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ export default class BoltProtocol extends BoltProtocolV2 {
afterError,
beforeComplete,
afterComplete,
flush = true
flush = true,
highRecordWatermark = Number.MAX_VALUE,
lowRecordWatermark = Number.MAX_VALUE
} = {}
) {
const observer = new ResultStreamObserver({
Expand All @@ -173,7 +175,9 @@ export default class BoltProtocol extends BoltProtocolV2 {
beforeError,
afterError,
beforeComplete,
afterComplete
afterComplete,
highRecordWatermark,
lowRecordWatermark
})

// passing in a database name on this protocol version throws an error
Expand Down
8 changes: 6 additions & 2 deletions packages/bolt-connection/src/bolt/bolt-protocol-v4x0.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ export default class BoltProtocol extends BoltProtocolV3 {
afterComplete,
flush = true,
reactive = false,
fetchSize = FETCH_ALL
fetchSize = FETCH_ALL,
highRecordWatermark = Number.MAX_VALUE,
lowRecordWatermark = Number.MAX_VALUE
} = {}
) {
const observer = new ResultStreamObserver({
Expand All @@ -104,7 +106,9 @@ export default class BoltProtocol extends BoltProtocolV3 {
beforeError,
afterError,
beforeComplete,
afterComplete
afterComplete,
highRecordWatermark,
lowRecordWatermark
})

// passing impersonated user on this protocol version throws an error
Expand Down
8 changes: 6 additions & 2 deletions packages/bolt-connection/src/bolt/bolt-protocol-v4x4.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ export default class BoltProtocol extends BoltProtocolV43 {
afterComplete,
flush = true,
reactive = false,
fetchSize = FETCH_ALL
fetchSize = FETCH_ALL,
highRecordWatermark = Number.MAX_VALUE,
lowRecordWatermark = Number.MAX_VALUE
} = {}
) {
const observer = new ResultStreamObserver({
Expand All @@ -98,7 +100,9 @@ export default class BoltProtocol extends BoltProtocolV43 {
beforeError,
afterError,
beforeComplete,
afterComplete
afterComplete,
highRecordWatermark,
lowRecordWatermark
})

const flushRun = reactive
Expand Down
71 changes: 50 additions & 21 deletions packages/bolt-connection/src/bolt/stream-observers.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class ResultStreamObserver extends StreamObserver {
afterKeys,
beforeComplete,
afterComplete,
server
server,
highRecordWatermark = Number.MAX_VALUE,
lowRecordWatermark = Number.MAX_VALUE
} = {}) {
super()

Expand All @@ -94,8 +96,32 @@ class ResultStreamObserver extends StreamObserver {
this._discardFunction = discardFunction
this._discard = false
this._fetchSize = fetchSize
this._lowRecordWatermark = lowRecordWatermark
this._highRecordWatermark = highRecordWatermark
this._setState(reactive ? _states.READY : _states.READY_STREAMING)
this._setupAuoPull(fetchSize)
this._setupAutoPull()
this._paused = false;
}

/**
* Pause the record consuming
*
* This function will supend the record consuming. It will not cancel the stream and the already
* requested records will be sent to the subscriber.
*/
pause () {
this._paused = true
}

/**
* Resume the record consuming
*
* This function will resume the record consuming fetching more records from the server.
*/
resume () {
this._paused = false
this._setupAutoPull(true)
this._state.pull(this)
}

/**
Expand Down Expand Up @@ -342,16 +368,21 @@ class ResultStreamObserver extends StreamObserver {

_handleStreaming () {
if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) {
if (this._discard) {
this._discardFunction(this._queryId, this)
this._setState(_states.STREAMING)
} else if (this._autoPull) {
this._moreFunction(this._queryId, this._fetchSize, this)
this._setState(_states.STREAMING)
if (!this._paused && (this._discard || this._autoPull)) {
this._more()
}
}
}

_more () {
if (this._discard) {
this._discardFunction(this._queryId, this)
} else {
this._moreFunction(this._queryId, this._fetchSize, this)
}
this._setState(_states.STREAMING)
}

_storeMetadataForCompletion (meta) {
const keys = Object.keys(meta)
let index = keys.length
Expand All @@ -367,15 +398,8 @@ class ResultStreamObserver extends StreamObserver {
this._state = state
}

_setupAuoPull (fetchSize) {
_setupAutoPull () {
this._autoPull = true
if (fetchSize === FETCH_ALL) {
this._lowRecordWatermark = Number.MAX_VALUE // we shall always lower than this number to enable auto pull
this._highRecordWatermark = Number.MAX_VALUE // we shall never reach this number to disable auto pull
} else {
this._lowRecordWatermark = 0.3 * fetchSize
this._highRecordWatermark = 0.7 * fetchSize
}
}
}

Expand Down Expand Up @@ -575,7 +599,8 @@ const _states = {
},
name: () => {
return 'READY_STREAMING'
}
},
pull: () => {}
},
READY: {
// reactive start state
Expand All @@ -590,7 +615,8 @@ const _states = {
},
name: () => {
return 'READY'
}
},
pull: streamObserver => streamObserver._more()
},
STREAMING: {
onSuccess: (streamObserver, meta) => {
Expand All @@ -605,20 +631,23 @@ const _states = {
},
name: () => {
return 'STREAMING'
}
},
pull: () => {}
},
FAILED: {
onError: error => {
// more errors are ignored
},
name: () => {
return 'FAILED'
}
},
pull: () => {}
},
SUCCEEDED: {
name: () => {
return 'SUCCEEDED'
}
},
pull: () => {}
}
}

Expand Down
22 changes: 22 additions & 0 deletions packages/bolt-connection/test/bolt/bolt-protocol-v1.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,26 @@ describe('#unit BoltProtocolV1', () => {
}
)
})

describe('watermarks', () => {
it('.run() should configure watermarks', () => {
const recorder = new utils.MessageRecordingConnection()
const protocol = utils.spyProtocolWrite(
new BoltProtocolV1(recorder, null, false)
)

const query = 'RETURN $x, $y'
const parameters = { x: 'x', y: 'y' }
const observer = protocol.run(query, parameters, {
bookmark: Bookmark.empty(),
txConfig: TxConfig.empty(),
mode: WRITE,
lowRecordWatermark: 100,
highRecordWatermark: 200,
})

expect(observer._lowRecordWatermark).toEqual(100)
expect(observer._highRecordWatermark).toEqual(200)
})
})
})
19 changes: 19 additions & 0 deletions packages/bolt-connection/test/bolt/bolt-protocol-v2.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,23 @@ describe('#unit BoltProtocolV2', () => {
})
})
})

describe('watermarks', () => {
it('.run() should configure watermarks', () => {
const recorder = new utils.MessageRecordingConnection()
const protocol = utils.spyProtocolWrite(
new BoltProtocolV2(recorder, null, false)
)

const query = 'RETURN $x, $y'
const parameters = { x: 'x', y: 'y' }
const observer = protocol.run(query, parameters, {
lowRecordWatermark: 100,
highRecordWatermark: 200,
})

expect(observer._lowRecordWatermark).toEqual(100)
expect(observer._highRecordWatermark).toEqual(200)
})
})
})
21 changes: 21 additions & 0 deletions packages/bolt-connection/test/bolt/bolt-protocol-v3.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,27 @@ describe('#unit BoltProtocolV3', () => {
}
)
})

describe('watermarks', () => {
it('.run() should configure watermarks', () => {
const recorder = new utils.MessageRecordingConnection()
const protocol = utils.spyProtocolWrite(
new BoltProtocolV3(recorder, null, false)
)

const query = 'RETURN $x, $y'
const parameters = { x: 'x', y: 'y' }
const observer = protocol.run(query, parameters, {
bookmark: Bookmark.empty(),
txConfig: TxConfig.empty(),
lowRecordWatermark: 100,
highRecordWatermark: 200,
})

expect(observer._lowRecordWatermark).toEqual(100)
expect(observer._highRecordWatermark).toEqual(200)
})
})
})

class SpiedBoltProtocolV3 extends BoltProtocolV3 {
Expand Down
21 changes: 21 additions & 0 deletions packages/bolt-connection/test/bolt/bolt-protocol-v4x0.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,27 @@ describe('#unit BoltProtocolV4x0', () => {
}
)
})

describe('watermarks', () => {
it('.run() should configure watermarks', () => {
const recorder = new utils.MessageRecordingConnection()
const protocol = utils.spyProtocolWrite(
new BoltProtocolV4x0(recorder, null, false)
)

const query = 'RETURN $x, $y'
const parameters = { x: 'x', y: 'y' }
const observer = protocol.run(query, parameters, {
bookmark: Bookmark.empty(),
txConfig: TxConfig.empty(),
lowRecordWatermark: 100,
highRecordWatermark: 200,
})

expect(observer._lowRecordWatermark).toEqual(100)
expect(observer._highRecordWatermark).toEqual(200)
})
})
})

class SpiedBoltProtocolV4x0 extends BoltProtocolV4x0 {
Expand Down
27 changes: 27 additions & 0 deletions packages/bolt-connection/test/bolt/bolt-protocol-v4x1.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

import BoltProtocolV4x1 from '../../src/bolt/bolt-protocol-v4x1'
import utils from '../test-utils'
import { internal } from 'neo4j-driver-core'

const {
txConfig: { TxConfig },
bookmark: { Bookmark }
} = internal

describe('#unit BoltProtocolV4x1', () => {
describe('Bolt v4.4', () => {
Expand Down Expand Up @@ -82,4 +88,25 @@ describe('#unit BoltProtocolV4x1', () => {
}
)
})

describe('watermarks', () => {
it('.run() should configure watermarks', () => {
const recorder = new utils.MessageRecordingConnection()
const protocol = utils.spyProtocolWrite(
new BoltProtocolV4x1(recorder, null, false)
)

const query = 'RETURN $x, $y'
const parameters = { x: 'x', y: 'y' }
const observer = protocol.run(query, parameters, {
bookmark: Bookmark.empty(),
txConfig: TxConfig.empty(),
lowRecordWatermark: 100,
highRecordWatermark: 200,
})

expect(observer._lowRecordWatermark).toEqual(100)
expect(observer._highRecordWatermark).toEqual(200)
})
})
})
Loading