Skip to content

Improve back-pressure mechanism for RxResult #882

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/neo4j-driver/src/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Driver extends CoreDriver {
bookmarkOrBookmarks: bookmarks,
database,
impersonatedUser,
reactive: true,
reactive: false,
fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize)
}),
config: this._config
Expand Down
173 changes: 145 additions & 28 deletions packages/neo4j-driver/src/result-rx.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
import { newError, Record, ResultSummary } from 'neo4j-driver-core'
import { Observable, Subject, ReplaySubject, from } from 'rxjs'
import { flatMap, publishReplay, refCount, shareReplay } from 'rxjs/operators'
import { flatMap, publishReplay, refCount } from 'rxjs/operators'

const States = {
READY: 0,
Expand All @@ -44,7 +44,8 @@ export default class RxResult {
publishReplay(1),
refCount()
)
this._records = new Subject()
this._records = undefined
this._controls = new StreamControl()
this._summary = new ReplaySubject()
this._state = States.READY
}
Expand Down Expand Up @@ -72,14 +73,16 @@ export default class RxResult {
* @returns {Observable<Record>} - An observable stream of records.
*/
records () {
return this._result.pipe(
const result = this._result.pipe(
flatMap(
result =>
new Observable(recordsObserver =>
this._startStreaming({ result, recordsObserver })
)
)
)
result.push = () => this._push()
return result
}

/**
Expand All @@ -102,6 +105,44 @@ export default class RxResult {
)
}

/**
* Pauses the automatic streaming of records.
*
* This method provides a way of controll the flow of records
*
* @experimental
*/
pause () {
this._controls.pause()
}

/**
* Resumes the automatic streaming of records.
*
* This method won't need to be called in normal stream operation. It only applies to the case when the stream is paused.
*
* This method is method won't start the consuming records if the ${@link records()} stream didn't get subscribed.
* @experimental
* @returns {Promise<void>} - A promise that resolves when the stream is resumed.
*/
resume () {
return this._controls.resume()
}

/**
* Pushes the next record to the stream.
*
* This method automatic pause the auto-streaming of records and then push next record to the stream.
*
* For returning the automatic streaming of records, use {@link resume} method.
*
* @experimental
* @returns {Promise<void>} - A promise that resolves when the push is completed.
*/
push () {
return this._controls.push()
}

_startStreaming ({
result,
recordsObserver = null,
Expand All @@ -115,9 +156,11 @@ export default class RxResult {

if (this._state < States.STREAMING) {
this._state = States.STREAMING

this._setupRecordsStream(result)
if (recordsObserver) {
subscriptions.push(this._records.subscribe(recordsObserver))
} else {
result._cancel()
}

subscriptions.push({
Expand All @@ -127,30 +170,6 @@ export default class RxResult {
}
}
})

if (this._records.observers.length === 0) {
result._cancel()
}

result.subscribe({
onNext: record => {
this._records.next(record)
},
onCompleted: summary => {
this._records.complete()

this._summary.next(summary)
this._summary.complete()

this._state = States.COMPLETED
},
onError: err => {
this._records.error(err)
this._summary.error(err)

this._state = States.COMPLETED
}
})
} else if (recordsObserver) {
recordsObserver.error(
newError(
Expand All @@ -163,4 +182,102 @@ export default class RxResult {
subscriptions.forEach(s => s.unsubscribe())
}
}

_setupRecordsStream (result) {
if (this._records) {
return this._records
}

this._records = createFullyControlledSubject(
result[Symbol.asyncIterator](),
{
complete: async () => {
this._state = States.COMPLETED
this._summary.next(await result.summary())
this._summary.complete()
},
error: error => {
this._state = States.COMPLETED
this._summary.error(error)
}
},
this._controls
)
return this._records
}
}

function createFullyControlledSubject (
iterator,
completeObserver,
streamControl = new StreamControl()
) {
const subject = new Subject()

const pushNextValue = async result => {
try {
streamControl.pushing = true
const { done, value } = await result
if (done) {
subject.complete()
completeObserver.complete()
} else {
subject.next(value)
if (!streamControl.paused) {
setImmediate(async () => await pushNextValue(iterator.next()))
}
}
} catch (error) {
subject.error(error)
completeObserver.error(error)
} finally {
streamControl.pushing = false
}
}

async function push (value) {
await pushNextValue(iterator.next(value))
}

streamControl.pusher = push
push()

return subject
}

class StreamControl {
constructor (push = async () => {}) {
this._paused = false
this._pushing = false
this._push = push
}

pause () {
this._paused = true
}

get paused () {
return this._paused
}

set pushing (pushing) {
this._pushing = pushing
}

async resume () {
const wasPaused = this._paused
this._paused = false
if (wasPaused && !this._pushing) {
await this._push()
}
}

async push () {
this.pause()
return await this._push()
}

set pusher (push) {
this._push = push
}
}
70 changes: 68 additions & 2 deletions packages/neo4j-driver/test/examples.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@
* limitations under the License.
*/

import neo4j from '../src'
import neo4j, { session } from '../src'
import sharedNeo4j from './internal/shared-neo4j'
import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version'
import { map, materialize, toArray } from 'rxjs/operators'
import {
bufferCount,
map,
materialize,
mergeMap,
toArray
} from 'rxjs/operators'
import { Notification } from 'rxjs'

/**
Expand Down Expand Up @@ -1529,6 +1535,66 @@ describe('#integration examples', () => {
}
})
})

describe('back-pressure', () => {
it('should control flow by manual push records to the stream', done => {
const driver = driverGlobal

const session = driver.rxSession()
const xs = []

const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x')
result
.records()
.pipe(map(record => record.get('x').toInt()))
.subscribe({
next: async x => {
xs.push(x)
// manual pushing reoords to the stream
// it pauses the automatic pushing
await result.push()
},
complete: async () => {
expect(xs).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
await session.close().toPromise()
done()
},
error: done.fail.bind(done)
})
Comment on lines +1543 to +1563
Copy link
Contributor Author

@bigmontz bigmontz Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage for result.push api

})
})

it('should control flow by resume and pause the stream', async () => {
const driver = driverGlobal
const callCostlyApi = async () => {}

const session = driver.rxSession()

try {
const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x')
const xs = await result
.records()
.pipe(
map(record => record.get('x').toInt()),
bufferCount(5), // buffer 5 records
mergeMap(async theXs => {
// pausing the records coming from the stream
result.pause()
// some costly operation
await callCostlyApi(theXs)
// resume the stream
await result.resume()
return theXs
}),
toArray()
)
.toPromise()

expect(xs).toEqual([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10]])
} finally {
await session.close().toPromise()
}
Comment on lines +1571 to +1596
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Examples for pause and resume

})
})

function removeLineBreaks (string) {
Expand Down
Loading