Skip to content

Commit 17727e9

Browse files
authored
Improve back-pressure mechanism for RxResult (#882)
The lack of consumer oriented back-pressure mechanism in the RxJS apis was causing memory issues and other problems in the client code. For solving this issue, the records observer returned by `RxResult.records()` was changed for using the async iterator as foundation. New methods for enabling the client for fine controlling the stream were added to the `RxResult` api. These methods are: - `pause()`: Pause the record streaming. No new record will be pushed to the stream util `push` or `resume` get called. - `resume()`: Resumes the records streaming. - `push()`: Push the next record. If the stream is not paused, this method will pause it for giving the push control to the client.
1 parent 5ef3add commit 17727e9

File tree

7 files changed

+1083
-31
lines changed

7 files changed

+1083
-31
lines changed

packages/neo4j-driver/src/driver.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class Driver extends CoreDriver {
7070
bookmarkOrBookmarks: bookmarks,
7171
database,
7272
impersonatedUser,
73-
reactive: true,
73+
reactive: false,
7474
fetchSize: validateFetchSizeValue(fetchSize, this._config.fetchSize)
7575
}),
7676
config: this._config

packages/neo4j-driver/src/result-rx.js

Lines changed: 145 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*/
1919
import { newError, Record, ResultSummary } from 'neo4j-driver-core'
2020
import { Observable, Subject, ReplaySubject, from } from 'rxjs'
21-
import { flatMap, publishReplay, refCount, shareReplay } from 'rxjs/operators'
21+
import { flatMap, publishReplay, refCount } from 'rxjs/operators'
2222

2323
const States = {
2424
READY: 0,
@@ -44,7 +44,8 @@ export default class RxResult {
4444
publishReplay(1),
4545
refCount()
4646
)
47-
this._records = new Subject()
47+
this._records = undefined
48+
this._controls = new StreamControl()
4849
this._summary = new ReplaySubject()
4950
this._state = States.READY
5051
}
@@ -72,14 +73,16 @@ export default class RxResult {
7273
* @returns {Observable<Record>} - An observable stream of records.
7374
*/
7475
records () {
75-
return this._result.pipe(
76+
const result = this._result.pipe(
7677
flatMap(
7778
result =>
7879
new Observable(recordsObserver =>
7980
this._startStreaming({ result, recordsObserver })
8081
)
8182
)
8283
)
84+
result.push = () => this._push()
85+
return result
8386
}
8487

8588
/**
@@ -102,6 +105,44 @@ export default class RxResult {
102105
)
103106
}
104107

108+
/**
109+
* Pauses the automatic streaming of records.
110+
*
111+
* This method provides a way of controll the flow of records
112+
*
113+
* @experimental
114+
*/
115+
pause () {
116+
this._controls.pause()
117+
}
118+
119+
/**
120+
* Resumes the automatic streaming of records.
121+
*
122+
* This method won't need to be called in normal stream operation. It only applies to the case when the stream is paused.
123+
*
124+
* This method is method won't start the consuming records if the ${@link records()} stream didn't get subscribed.
125+
* @experimental
126+
* @returns {Promise<void>} - A promise that resolves when the stream is resumed.
127+
*/
128+
resume () {
129+
return this._controls.resume()
130+
}
131+
132+
/**
133+
* Pushes the next record to the stream.
134+
*
135+
* This method automatic pause the auto-streaming of records and then push next record to the stream.
136+
*
137+
* For returning the automatic streaming of records, use {@link resume} method.
138+
*
139+
* @experimental
140+
* @returns {Promise<void>} - A promise that resolves when the push is completed.
141+
*/
142+
push () {
143+
return this._controls.push()
144+
}
145+
105146
_startStreaming ({
106147
result,
107148
recordsObserver = null,
@@ -115,9 +156,11 @@ export default class RxResult {
115156

116157
if (this._state < States.STREAMING) {
117158
this._state = States.STREAMING
118-
159+
this._setupRecordsStream(result)
119160
if (recordsObserver) {
120161
subscriptions.push(this._records.subscribe(recordsObserver))
162+
} else {
163+
result._cancel()
121164
}
122165

123166
subscriptions.push({
@@ -127,30 +170,6 @@ export default class RxResult {
127170
}
128171
}
129172
})
130-
131-
if (this._records.observers.length === 0) {
132-
result._cancel()
133-
}
134-
135-
result.subscribe({
136-
onNext: record => {
137-
this._records.next(record)
138-
},
139-
onCompleted: summary => {
140-
this._records.complete()
141-
142-
this._summary.next(summary)
143-
this._summary.complete()
144-
145-
this._state = States.COMPLETED
146-
},
147-
onError: err => {
148-
this._records.error(err)
149-
this._summary.error(err)
150-
151-
this._state = States.COMPLETED
152-
}
153-
})
154173
} else if (recordsObserver) {
155174
recordsObserver.error(
156175
newError(
@@ -163,4 +182,102 @@ export default class RxResult {
163182
subscriptions.forEach(s => s.unsubscribe())
164183
}
165184
}
185+
186+
_setupRecordsStream (result) {
187+
if (this._records) {
188+
return this._records
189+
}
190+
191+
this._records = createFullyControlledSubject(
192+
result[Symbol.asyncIterator](),
193+
{
194+
complete: async () => {
195+
this._state = States.COMPLETED
196+
this._summary.next(await result.summary())
197+
this._summary.complete()
198+
},
199+
error: error => {
200+
this._state = States.COMPLETED
201+
this._summary.error(error)
202+
}
203+
},
204+
this._controls
205+
)
206+
return this._records
207+
}
208+
}
209+
210+
function createFullyControlledSubject (
211+
iterator,
212+
completeObserver,
213+
streamControl = new StreamControl()
214+
) {
215+
const subject = new Subject()
216+
217+
const pushNextValue = async result => {
218+
try {
219+
streamControl.pushing = true
220+
const { done, value } = await result
221+
if (done) {
222+
subject.complete()
223+
completeObserver.complete()
224+
} else {
225+
subject.next(value)
226+
if (!streamControl.paused) {
227+
setImmediate(async () => await pushNextValue(iterator.next()))
228+
}
229+
}
230+
} catch (error) {
231+
subject.error(error)
232+
completeObserver.error(error)
233+
} finally {
234+
streamControl.pushing = false
235+
}
236+
}
237+
238+
async function push (value) {
239+
await pushNextValue(iterator.next(value))
240+
}
241+
242+
streamControl.pusher = push
243+
push()
244+
245+
return subject
246+
}
247+
248+
class StreamControl {
249+
constructor (push = async () => {}) {
250+
this._paused = false
251+
this._pushing = false
252+
this._push = push
253+
}
254+
255+
pause () {
256+
this._paused = true
257+
}
258+
259+
get paused () {
260+
return this._paused
261+
}
262+
263+
set pushing (pushing) {
264+
this._pushing = pushing
265+
}
266+
267+
async resume () {
268+
const wasPaused = this._paused
269+
this._paused = false
270+
if (wasPaused && !this._pushing) {
271+
await this._push()
272+
}
273+
}
274+
275+
async push () {
276+
this.pause()
277+
return await this._push()
278+
}
279+
280+
set pusher (push) {
281+
this._push = push
282+
}
166283
}

packages/neo4j-driver/test/examples.test.js

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717
* limitations under the License.
1818
*/
1919

20-
import neo4j from '../src'
20+
import neo4j, { session } from '../src'
2121
import sharedNeo4j from './internal/shared-neo4j'
2222
import { ServerVersion, VERSION_4_0_0 } from '../src/internal/server-version'
23-
import { map, materialize, toArray } from 'rxjs/operators'
23+
import {
24+
bufferCount,
25+
map,
26+
materialize,
27+
mergeMap,
28+
toArray
29+
} from 'rxjs/operators'
2430
import { Notification } from 'rxjs'
2531

2632
/**
@@ -1529,6 +1535,66 @@ describe('#integration examples', () => {
15291535
}
15301536
})
15311537
})
1538+
1539+
describe('back-pressure', () => {
1540+
it('should control flow by manual push records to the stream', done => {
1541+
const driver = driverGlobal
1542+
1543+
const session = driver.rxSession()
1544+
const xs = []
1545+
1546+
const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x')
1547+
result
1548+
.records()
1549+
.pipe(map(record => record.get('x').toInt()))
1550+
.subscribe({
1551+
next: async x => {
1552+
xs.push(x)
1553+
// manual pushing reoords to the stream
1554+
// it pauses the automatic pushing
1555+
await result.push()
1556+
},
1557+
complete: async () => {
1558+
expect(xs).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
1559+
await session.close().toPromise()
1560+
done()
1561+
},
1562+
error: done.fail.bind(done)
1563+
})
1564+
})
1565+
})
1566+
1567+
it('should control flow by resume and pause the stream', async () => {
1568+
const driver = driverGlobal
1569+
const callCostlyApi = async () => {}
1570+
1571+
const session = driver.rxSession()
1572+
1573+
try {
1574+
const result = session.run('UNWIND RANGE(0, 10) AS x RETURN x')
1575+
const xs = await result
1576+
.records()
1577+
.pipe(
1578+
map(record => record.get('x').toInt()),
1579+
bufferCount(5), // buffer 5 records
1580+
mergeMap(async theXs => {
1581+
// pausing the records coming from the stream
1582+
result.pause()
1583+
// some costly operation
1584+
await callCostlyApi(theXs)
1585+
// resume the stream
1586+
await result.resume()
1587+
return theXs
1588+
}),
1589+
toArray()
1590+
)
1591+
.toPromise()
1592+
1593+
expect(xs).toEqual([[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10]])
1594+
} finally {
1595+
await session.close().toPromise()
1596+
}
1597+
})
15321598
})
15331599

15341600
function removeLineBreaks (string) {

0 commit comments

Comments
 (0)