Skip to content

Commit 8d6f96e

Browse files
authored
Fix late subscription to an already broken stream (#912)
A late subscription was not receiving already cached `records` and `keys` from streams which already had received errors. This kind of behaviour makes it hard to debug issues and make tests like `stub.disconnects.test_disconnects.TestDisconnects.test_disconnect_session_on_tx_pull_after_record` flaky. **More details** When the you call `run` in the driver, internally the driver will send `RUN` and `PULL`, and put `ResultStreamObserver` in the `ResponseHandler` to get notified of the messages coming. If there isn't any subscription to the `ResultStreamObserver` (i.e. `Result.then`, `Result.keys`, `Result.subscribe`, `for(const record of result)`, etc), the ResultStreamObserver will accumulate the events internally (and don't ask for more when the PULL is over). So, when you interact with the `Result`, the subscription will be made and then the records, keys, summary and error accumulated will be informed to the observer created by the `Result`. The problem was happening because if the error arrives while the `Result` was not subscribed yet (for instance, the iterator was not created, which was the case in the test since I create the iterator in the first next call) the other events were not informed to the Result. This way in the case of the server disconnect after the first record be received, it could happen of you didn't get this first record if you iterate too late. **Solution** Moving the error notification to the end of the `ResultStreamObserver.subscribe` method solves this issue and it makes the events being informed in the correct order (in the subscribe method call).
1 parent 28caded commit 8d6f96e

File tree

2 files changed

+127
-4
lines changed

2 files changed

+127
-4
lines changed

packages/bolt-connection/src/bolt/stream-observers.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,6 @@ class ResultStreamObserver extends StreamObserver {
202202
* @param {function(error: Object)} observer.onError - Handle errors, should always be provided.
203203
*/
204204
subscribe (observer) {
205-
if (this._error) {
206-
observer.onError(this._error)
207-
return
208-
}
209205
if (this._head && observer.onKeys) {
210206
observer.onKeys(this._head)
211207
}
@@ -223,6 +219,9 @@ class ResultStreamObserver extends StreamObserver {
223219
if (this._tail && observer.onCompleted) {
224220
observer.onCompleted(this._tail)
225221
}
222+
if (this._error) {
223+
observer.onError(this._error)
224+
}
226225
this._observers.push(observer)
227226

228227
if (this._state === _states.READY) {

packages/bolt-connection/test/bolt/stream-observer.test.js

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,130 @@ describe('#unit ResultStreamObserver', () => {
199199
})
200200
})
201201

202+
it('should inform all the pre-existing events of a success stream to the subscriber', () => {
203+
const streamObserver = new ResultStreamObserver()
204+
const received = {
205+
onCompleted: [],
206+
onError: [],
207+
onNext: [],
208+
onKeys: []
209+
}
210+
const observer = {
211+
onCompleted: metadata => received.onCompleted.push(metadata),
212+
onError: error => received.onError.push(error),
213+
onNext: record => received.onNext.push(record),
214+
onKeys: keys => received.onKeys.push(keys)
215+
}
216+
217+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'] })
218+
219+
streamObserver.onNext([1, 2, 3])
220+
streamObserver.onNext([11, 22, 33])
221+
streamObserver.onNext([111, 222, 333])
222+
223+
streamObserver.onCompleted({ key: 42, has_more: false })
224+
225+
streamObserver.subscribe(observer)
226+
227+
expect(received.onNext.length).toEqual(3)
228+
expect(received.onNext[0].toObject()).toEqual({ A: 1, B: 2, C: 3 })
229+
expect(received.onNext[1].toObject()).toEqual({ A: 11, B: 22, C: 33 })
230+
expect(received.onNext[2].toObject()).toEqual({ A: 111, B: 222, C: 333 })
231+
expect(received.onKeys).toEqual([['A', 'B', 'C']])
232+
expect(received.onCompleted).toEqual([{ key: 42, has_more: false }])
233+
expect(received.onError).toEqual([])
234+
})
235+
236+
it('should inform all the pre-existing events of a success stream to the subscriber in the correct order', () => {
237+
const streamObserver = new ResultStreamObserver()
238+
const received = []
239+
const observer = {
240+
onCompleted: metadata => received.push(metadata),
241+
onError: error => received.push(error),
242+
onNext: record => received.push(record),
243+
onKeys: keys => received.push(keys)
244+
}
245+
246+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'] })
247+
248+
streamObserver.onNext([1, 2, 3])
249+
streamObserver.onNext([11, 22, 33])
250+
streamObserver.onNext([111, 222, 333])
251+
252+
streamObserver.onCompleted({ key: 42, has_more: false })
253+
254+
streamObserver.subscribe(observer)
255+
256+
expect(received.length).toEqual(5)
257+
expect(received[0]).toEqual(['A', 'B', 'C'])
258+
expect(received[1].toObject()).toEqual({ A: 1, B: 2, C: 3 })
259+
expect(received[2].toObject()).toEqual({ A: 11, B: 22, C: 33 })
260+
expect(received[3].toObject()).toEqual({ A: 111, B: 222, C: 333 })
261+
expect(received[4]).toEqual({ key: 42, has_more: false })
262+
})
263+
264+
it('should inform all the pre-existing events of an error stream to the subscriber', () => {
265+
const streamObserver = new ResultStreamObserver()
266+
const received = {
267+
onCompleted: [],
268+
onError: [],
269+
onNext: [],
270+
onKeys: []
271+
}
272+
const observer = {
273+
onCompleted: metadata => received.onCompleted.push(metadata),
274+
onError: error => received.onError.push(error),
275+
onNext: record => received.onNext.push(record),
276+
onKeys: keys => received.onKeys.push(keys)
277+
}
278+
279+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'] })
280+
281+
streamObserver.onNext([1, 2, 3])
282+
streamObserver.onNext([11, 22, 33])
283+
streamObserver.onNext([111, 222, 333])
284+
285+
streamObserver.onError(newError('something is on the way'))
286+
287+
streamObserver.subscribe(observer)
288+
289+
expect(received.onNext.length).toEqual(3)
290+
expect(received.onNext[0].toObject()).toEqual({ A: 1, B: 2, C: 3 })
291+
expect(received.onNext[1].toObject()).toEqual({ A: 11, B: 22, C: 33 })
292+
expect(received.onNext[2].toObject()).toEqual({ A: 111, B: 222, C: 333 })
293+
expect(received.onKeys).toEqual([['A', 'B', 'C']])
294+
expect(received.onCompleted).toEqual([])
295+
expect(received.onError).toEqual([newError('something is on the way')])
296+
})
297+
298+
it('should inform all the pre-existing events of an error stream stream to the subscriber in the correct order', () => {
299+
const streamObserver = new ResultStreamObserver()
300+
const received = []
301+
const observer = {
302+
onCompleted: metadata => received.push(metadata),
303+
onError: error => received.push(error),
304+
onNext: record => received.push(record),
305+
onKeys: keys => received.push(keys)
306+
}
307+
308+
streamObserver.onCompleted({ fields: ['A', 'B', 'C'] })
309+
310+
streamObserver.onNext([1, 2, 3])
311+
streamObserver.onNext([11, 22, 33])
312+
streamObserver.onNext([111, 222, 333])
313+
314+
streamObserver.onError(newError('something is on the way'))
315+
316+
streamObserver.subscribe(observer)
317+
318+
expect(received.length).toEqual(5)
319+
expect(received[0]).toEqual(['A', 'B', 'C'])
320+
expect(received[1].toObject()).toEqual({ A: 1, B: 2, C: 3 })
321+
expect(received[2].toObject()).toEqual({ A: 11, B: 22, C: 33 })
322+
expect(received[3].toObject()).toEqual({ A: 111, B: 222, C: 333 })
323+
expect(received[4]).toEqual(newError('something is on the way'))
324+
})
325+
202326
describe('when is not paused (default)', () => {
203327
it('should ask for more records when the stream is completed and has more', () => {
204328
// Setup

0 commit comments

Comments
 (0)