Skip to content

Commit 73bea11

Browse files
committed
Add tests to ResultStreamObserver.pull() and change StreamObserver.setPullMode to setExplicityPull
1 parent b3ce989 commit 73bea11

File tree

5 files changed

+282
-12
lines changed

5 files changed

+282
-12
lines changed

packages/bolt-connection/src/bolt/stream-observers.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,12 @@ class ResultStreamObserver extends StreamObserver {
9999
this._lowRecordWatermark = lowRecordWatermark
100100
this._highRecordWatermark = highRecordWatermark
101101
this._setState(reactive ? _states.READY : _states.READY_STREAMING)
102-
this._setupAuoPull()
103-
this._pullMode = false;
102+
this._setupAutoPull()
103+
this._explicityPull = false;
104104
}
105105

106-
setPullMode(pullMode) {
107-
this._pullMode = pullMode;
106+
setExplicityPull(explicityPull) {
107+
this._explicityPull = explicityPull;
108108
}
109109

110110
pull() {
@@ -355,7 +355,7 @@ class ResultStreamObserver extends StreamObserver {
355355

356356
_handleStreaming () {
357357
if (this._head && this._observers.some(o => o.onNext || o.onCompleted)) {
358-
if (!this._pullMode && (this._discard || this._autoPull)) {
358+
if (!this._explicityPull && (this._discard || this._autoPull)) {
359359
this._more()
360360
}
361361
}
@@ -385,7 +385,7 @@ class ResultStreamObserver extends StreamObserver {
385385
this._state = state
386386
}
387387

388-
_setupAuoPull () {
388+
_setupAutoPull () {
389389
this._autoPull = true
390390
}
391391
}

packages/bolt-connection/test/bolt/stream-observer.test.js

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,276 @@ describe('#unit ResultStreamObserver', () => {
198198
}
199199
})
200200
})
201+
202+
describe('when is not explicity pull (default)', () => {
203+
it('should ask for more records when the stream is completed and has more', () => {
204+
// Setup
205+
const queryId = 123
206+
const fetchSize = 2000
207+
208+
const more = jest.fn()
209+
const streamObserver = new ResultStreamObserver({
210+
moreFunction: more,
211+
fetchSize: 2000
212+
})
213+
214+
// action
215+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
216+
217+
streamObserver.subscribe(newObserver())
218+
219+
streamObserver.onNext([1, 2, 3])
220+
streamObserver.onNext([11, 22, 33])
221+
streamObserver.onCompleted({ has_more: true })
222+
223+
streamObserver.onNext([111, 222, 333])
224+
streamObserver.onCompleted({ has_more: false })
225+
226+
// verification
227+
expect(more).toBeCalledTimes(1)
228+
expect(more).toBeCalledWith(queryId, fetchSize, streamObserver)
229+
})
230+
})
231+
232+
describe('when is explicity pull enabled', () => {
233+
it('should not ask for more records when the stream is completed and has more', () => {
234+
// Setup
235+
const queryId = 123
236+
237+
const more = jest.fn()
238+
const streamObserver = new ResultStreamObserver({
239+
moreFunction: more,
240+
fetchSize: 2000
241+
})
242+
streamObserver.setExplicityPull(true)
243+
244+
// action
245+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
246+
247+
streamObserver.subscribe(newObserver())
248+
249+
streamObserver.onNext([1, 2, 3])
250+
streamObserver.onNext([11, 22, 33])
251+
streamObserver.onCompleted({ has_more: true })
252+
253+
// verification
254+
expect(more).toBeCalledTimes(0)
255+
})
256+
257+
describe('pull()', () => {
258+
it('should ask for more records when the stream is completed and has more', () => {
259+
// Setup
260+
const queryId = 123
261+
const fetchSize = 2000
262+
263+
const more = jest.fn()
264+
const streamObserver = new ResultStreamObserver({
265+
moreFunction: more,
266+
fetchSize: fetchSize
267+
})
268+
streamObserver.setExplicityPull(true)
269+
270+
// Scenario
271+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
272+
273+
streamObserver.subscribe(newObserver())
274+
275+
streamObserver.onNext([1, 2, 3])
276+
streamObserver.onNext([11, 22, 33])
277+
streamObserver.onCompleted({ has_more: true })
278+
279+
// Action
280+
streamObserver.pull()
281+
282+
// verification
283+
expect(more).toBeCalledTimes(1)
284+
expect(more).toBeCalledWith(queryId, fetchSize, streamObserver)
285+
})
286+
287+
it('should ask for more records when the stream is a new reactive stream', () => {
288+
// Setup
289+
const queryId = 123
290+
const fetchSize = 2000
291+
292+
const more = jest.fn()
293+
const streamObserver = new ResultStreamObserver({
294+
moreFunction: more,
295+
fetchSize: fetchSize,
296+
reactive: true
297+
})
298+
streamObserver.setExplicityPull(true)
299+
300+
// Scenario
301+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
302+
303+
// Action
304+
streamObserver.pull()
305+
306+
// verification
307+
expect(more).toBeCalledTimes(1)
308+
expect(more).toBeCalledWith(queryId, fetchSize, streamObserver)
309+
})
310+
311+
312+
it('should ask for more records when the stream is a new reactive stream and not run success come yet', () => {
313+
// Setup
314+
const queryId = 123
315+
const fetchSize = 2000
316+
317+
const more = jest.fn()
318+
const streamObserver = new ResultStreamObserver({
319+
moreFunction: more,
320+
fetchSize: fetchSize,
321+
reactive: true
322+
})
323+
streamObserver.setExplicityPull(true)
324+
325+
// Action
326+
streamObserver.pull()
327+
328+
// verification
329+
expect(more).toBeCalledTimes(1)
330+
expect(more).toBeCalledWith(null, fetchSize, streamObserver)
331+
})
332+
333+
it('should not ask for more records when the stream is a new stream', () => {
334+
// Setup
335+
const queryId = 123
336+
const fetchSize = 2000
337+
338+
const more = jest.fn()
339+
const streamObserver = new ResultStreamObserver({
340+
moreFunction: more,
341+
fetchSize: fetchSize,
342+
reactive: false
343+
})
344+
streamObserver.setExplicityPull(true)
345+
346+
// Scenario
347+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
348+
349+
// Action
350+
streamObserver.pull()
351+
352+
// verification
353+
expect(more).toBeCalledTimes(0)
354+
})
355+
356+
357+
it('should not ask for more records when the stream is a new stream', () => {
358+
// Setup
359+
const queryId = 123
360+
const fetchSize = 2000
361+
362+
const more = jest.fn()
363+
const streamObserver = new ResultStreamObserver({
364+
moreFunction: more,
365+
fetchSize: fetchSize
366+
})
367+
368+
streamObserver.setExplicityPull(true)
369+
370+
// Scenario
371+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
372+
373+
// Action
374+
streamObserver.pull()
375+
376+
// verification
377+
expect(more).toBeCalledTimes(0)
378+
})
379+
380+
it('should not ask for more records when it is streaming', () => {
381+
// Setup
382+
const queryId = 123
383+
const fetchSize = 2000
384+
385+
const more = jest.fn()
386+
const streamObserver = new ResultStreamObserver({
387+
moreFunction: more,
388+
fetchSize: fetchSize
389+
})
390+
391+
streamObserver.setExplicityPull(true)
392+
393+
// Scenario
394+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
395+
396+
streamObserver.subscribe(newObserver())
397+
398+
streamObserver.onNext([1, 2, 3])
399+
streamObserver.onNext([11, 22, 33])
400+
streamObserver.onCompleted({ has_more: true })
401+
402+
streamObserver.pull() // should actual call
403+
404+
streamObserver.onNext([111, 222, 333])
405+
406+
// Action
407+
streamObserver.pull()
408+
409+
// verification
410+
expect(more).toBeCalledTimes(1)
411+
})
412+
413+
it('should not ask for more records when result is completed', () => {
414+
// Setup
415+
const queryId = 123
416+
const fetchSize = 2000
417+
418+
const more = jest.fn()
419+
const streamObserver = new ResultStreamObserver({
420+
moreFunction: more,
421+
fetchSize: fetchSize
422+
})
423+
424+
streamObserver.setExplicityPull(true)
425+
426+
// Scenario
427+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
428+
429+
streamObserver.subscribe(newObserver())
430+
431+
streamObserver.onNext([1, 2, 3])
432+
streamObserver.onNext([11, 22, 33])
433+
streamObserver.onCompleted({ has_more: false })
434+
435+
// Action
436+
streamObserver.pull()
437+
438+
// verification
439+
expect(more).toBeCalledTimes(0)
440+
})
441+
442+
it('should not ask for more records when stream failed', () => {
443+
// Setup
444+
const queryId = 123
445+
const fetchSize = 2000
446+
447+
const more = jest.fn()
448+
const streamObserver = new ResultStreamObserver({
449+
moreFunction: more,
450+
fetchSize: fetchSize
451+
})
452+
453+
streamObserver.setExplicityPull(true)
454+
455+
// Scenario
456+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'], qid: queryId })
457+
458+
streamObserver.subscribe(newObserver())
459+
460+
streamObserver.onNext([1, 2, 3])
461+
streamObserver.onError(new Error('error'))
462+
463+
// Action
464+
streamObserver.pull()
465+
466+
// verification
467+
expect(more).toBeCalledTimes(0)
468+
})
469+
})
470+
})
201471
})
202472

203473
describe('#unit RouteObserver', () => {

packages/core/src/internal/observers.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ export interface ResultStreamObserver extends StreamObserver {
8989
*/
9090
prepareToHandleSingleResponse(): void
9191

92-
setPullMode(pullMode: boolean): void
92+
setExplicityPull(explicityPull: boolean): void
9393

9494
pull(): void
9595

@@ -127,7 +127,7 @@ export class CompletedObserver implements ResultStreamObserver {
127127
// do nothing
128128
}
129129

130-
setPullMode(_: boolean): void {
130+
setExplicityPull(_: boolean): void {
131131
// do nothing
132132
}
133133

@@ -182,7 +182,7 @@ export class FailedObserver implements ResultStreamObserver {
182182
// do nothing
183183
}
184184

185-
setPullMode(_: boolean): void {
185+
setExplicityPull(_: boolean): void {
186186
// do nothing
187187
}
188188

packages/core/src/result.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,12 +336,12 @@ class Result implements Promise<QueryResult> {
336336
.catch(() => {})
337337
}
338338

339-
_subscribe(observer: ResultObserver, pullMode: boolean = false): Promise<observer.ResultStreamObserver> {
339+
_subscribe(observer: ResultObserver, explicityPull: boolean = false): Promise<observer.ResultStreamObserver> {
340340
const _observer = this._decorateObserver(observer)
341341

342342
return this._streamObserverPromise
343343
.then(o => {
344-
o.setPullMode(pullMode)
344+
o.setExplicityPull(explicityPull)
345345
o.subscribe(_observer)
346346
return o
347347
})

packages/core/test/result.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,7 @@ class ResultStreamObserverMock implements observer.ResultStreamObserver {
766766
.forEach(o => o.onCompleted!(meta))
767767
}
768768

769-
setPullMode(_: boolean): void {
769+
setExplicityPull(_: boolean): void {
770770
// do nothing
771771
}
772772

0 commit comments

Comments
 (0)