Skip to content

Commit f3f3788

Browse files
committed
Add docs and improve code design
1 parent 04593da commit f3f3788

File tree

2 files changed

+199
-50
lines changed

2 files changed

+199
-50
lines changed

packages/core/src/result.ts

Lines changed: 104 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,28 @@ interface ResultObserver {
8787
onError?: (error: Error) => void
8888
}
8989

90+
/**
91+
* Defines the elements in the queue result observer
92+
* @access private
93+
*/
94+
type QueuedResultElement = {
95+
done: false
96+
record: Record
97+
} | {
98+
done: true
99+
summary: ResultSummary
100+
}
101+
102+
/**
103+
* Defines a ResultObserver interface which can be used to enqueue records and dequeue
104+
* them until the result is fully received.
105+
* @access private
106+
*/
107+
interface QueuedResultObserver extends ResultObserver {
108+
dequeue (): Promise<QueuedResultElement>
109+
get size (): number
110+
}
111+
90112
/**
91113
* A stream of {@link Record} representing the result of a query.
92114
* Can be consumed eagerly as {@link Promise} resolved with array of records and {@link ResultSummary}
@@ -211,78 +233,45 @@ class Result implements Promise<QueryResult> {
211233
return this._p
212234
}
213235

236+
/**
237+
* Provides a async iterator over the records in the result.
238+
*
239+
* *Should not be combined with {@link Result#subscribe} or ${@link Result#then} functions.*
240+
*
241+
* @public
242+
* @returns {AsyncIterator<Record, ResultSummary>} The async iterator for the Results
243+
*/
214244
async* [Symbol.asyncIterator](): AsyncIterator<Record, ResultSummary> {
215-
interface ConsumedValue {
216-
done: boolean
217-
record?: Record
218-
summary?: ResultSummary
219-
}
220-
221-
interface ResolvablePromise<T> {
222-
promise: Promise<T>
223-
resolve: (arg: T) => any | undefined
224-
reject: (arg: Error) => any | undefined
225-
}
226-
227-
function createResolvablePromise (): ResolvablePromise<ConsumedValue> {
228-
const resolvablePromise: any = {}
229-
resolvablePromise.promise = new Promise((resolve, reject) => {
230-
resolvablePromise.resolve = resolve
231-
resolvablePromise.reject = reject
232-
});
233-
return resolvablePromise;
234-
}
235-
236-
const observer = {
237-
_buffer: [createResolvablePromise()],
238-
onNext: (record: Record) => {
239-
observer._buffer[observer._buffer.length - 1].resolve({ record, done: false });
240-
observer._buffer.push(createResolvablePromise());
241-
},
242-
onCompleted: (summary: ResultSummary) => {
243-
observer._buffer[observer._buffer.length - 1].resolve({ summary, done: true });
244-
},
245-
onError: (error: Error) => {
246-
observer._buffer[observer._buffer.length - 1].reject(error);
247-
},
248-
consume: async () => {
249-
const value = await observer._buffer[0].promise
250-
observer._buffer.shift();
251-
return value
252-
},
253-
get queueSize (): number {
254-
return observer._buffer.length - 1
255-
}
256-
}
245+
const queuedObserver = this._createQueuedResultObserver()
257246

258247
const status = { paused: false }
259248

260249
let streaming: observer.ResultStreamObserver | null = null
261250

262251
try {
263-
streaming = await this._subscribe(observer, true)
252+
streaming = await this._subscribe(queuedObserver, true)
264253
} catch (e) {
265254
// ignore, we will handle it in consume since the error is notifies in the onError callback
266255
}
267256

268257
const pullIfNeeded = () => {
269-
if (observer.queueSize >= this._watermarks.high) {
258+
if (queuedObserver.size >= this._watermarks.high) {
270259
status.paused = true
271-
} else if (observer.queueSize <= this._watermarks.low) {
260+
} else if (queuedObserver.size <= this._watermarks.low) {
272261
status.paused = false
273262
}
274-
if (!status.paused && observer.queueSize < this._watermarks.high && streaming) {
263+
if (!status.paused && queuedObserver.size < this._watermarks.high && streaming) {
275264
streaming.pull()
276265
}
277266
}
278267

279268
while(true) {
280269
pullIfNeeded()
281-
const value = await observer.consume()
282-
if (value.done) {
283-
return value.summary!
270+
const next = await queuedObserver.dequeue()
271+
if (next.done) {
272+
return next.summary
284273
}
285-
yield value.record!
274+
yield next.record
286275
}
287276
}
288277

@@ -347,6 +336,15 @@ class Result implements Promise<QueryResult> {
347336
.catch(() => {})
348337
}
349338

339+
/**
340+
* Stream records to observer as they come in, this is a more efficient method
341+
* of handling the results, and allows you to handle arbitrarily large results.
342+
*
343+
* @access private
344+
* @param {ResultObserver} observer The observer to send records to.
345+
* @param {boolean} explicityPull The flag to indicate if the pull should be called explicitly.
346+
* @returns {Promise<observer.ResultStreamObserver>} The result stream observer.
347+
*/
350348
_subscribe(observer: ResultObserver, explicityPull: boolean = false): Promise<observer.ResultStreamObserver> {
351349
const _observer = this._decorateObserver(observer)
352350

@@ -362,6 +360,13 @@ class Result implements Promise<QueryResult> {
362360
})
363361
}
364362

363+
/**
364+
* Decorates the ResultObserver with the necessary methods.
365+
*
366+
* @access private
367+
* @param {ResultObserver} observer The ResultObserver to decorate.
368+
* @returns The decorated result observer
369+
*/
365370
_decorateObserver(observer: ResultObserver): ResultObserver {
366371
const onCompletedOriginal = observer.onCompleted || DEFAULT_ON_COMPLETED
367372
const onCompletedWrapper = (metadata: any) => {
@@ -407,6 +412,11 @@ class Result implements Promise<QueryResult> {
407412
this._streamObserverPromise.then(o => o.cancel())
408413
}
409414

415+
/**
416+
* @access private
417+
* @param metadata
418+
* @returns
419+
*/
410420
private _createSummary(metadata: any): Promise<ResultSummary> {
411421
const {
412422
validatedQuery: query,
@@ -434,6 +444,50 @@ class Result implements Promise<QueryResult> {
434444
new ResultSummary(query, parameters, metadata, protocolVersion)
435445
)
436446
}
447+
448+
/**
449+
* @access private
450+
*/
451+
private _createQueuedResultObserver (): QueuedResultObserver {
452+
interface ResolvablePromise<T> {
453+
promise: Promise<T>
454+
resolve: (arg: T) => any | undefined
455+
reject: (arg: Error) => any | undefined
456+
}
457+
458+
function createResolvablePromise (): ResolvablePromise<QueuedResultElement> {
459+
const resolvablePromise: any = {}
460+
resolvablePromise.promise = new Promise((resolve, reject) => {
461+
resolvablePromise.resolve = resolve
462+
resolvablePromise.reject = reject
463+
});
464+
return resolvablePromise;
465+
}
466+
467+
const observer = {
468+
_buffer: [createResolvablePromise()],
469+
onNext: (record: Record) => {
470+
observer._buffer[observer._buffer.length - 1].resolve({ done: false, record });
471+
observer._buffer.push(createResolvablePromise());
472+
},
473+
onCompleted: (summary: ResultSummary) => {
474+
observer._buffer[observer._buffer.length - 1].resolve({ done: true, summary });
475+
},
476+
onError: (error: Error) => {
477+
observer._buffer[observer._buffer.length - 1].reject(error);
478+
},
479+
dequeue: async () => {
480+
const value = await observer._buffer[0].promise
481+
observer._buffer.shift();
482+
return value
483+
},
484+
get size (): number {
485+
return observer._buffer.length - 1
486+
}
487+
}
488+
489+
return observer;
490+
}
437491
}
438492

439493
function captureStacktrace(): string | null {

packages/core/test/result.test.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,101 @@ describe('Result', () => {
693693
new Record(keys, rawRecord2)
694694
])
695695
})
696+
697+
describe('onError', () => {
698+
it('should throws an exception while iterate over records', async () => {
699+
const keys = ['a', 'b']
700+
const rawRecord1 = [1, 2]
701+
const rawRecord2 = [3, 4]
702+
const expectedError = new Error('test')
703+
let observedError: Error | undefined
704+
705+
streamObserverMock.onKeys(keys)
706+
streamObserverMock.onNext(rawRecord1)
707+
streamObserverMock.onNext(rawRecord2)
708+
709+
const records = []
710+
711+
try {
712+
for await (const record of result) {
713+
records.push(record)
714+
streamObserverMock.onError(expectedError)
715+
}
716+
} catch (err) {
717+
observedError = err
718+
}
719+
720+
expect(observedError).toEqual(expectedError)
721+
})
722+
723+
it('should resolve the already received records', async () => {
724+
const keys = ['a', 'b']
725+
const rawRecord1 = [1, 2]
726+
const rawRecord2 = [3, 4]
727+
const expectedError = new Error('test')
728+
729+
streamObserverMock.onKeys(keys)
730+
streamObserverMock.onNext(rawRecord1)
731+
streamObserverMock.onNext(rawRecord2)
732+
733+
const records = []
734+
735+
try {
736+
for await (const record of result) {
737+
records.push(record)
738+
streamObserverMock.onError(expectedError)
739+
}
740+
} catch (err) {
741+
// do nothing
742+
}
743+
744+
expect(records).toEqual([
745+
new Record(keys, rawRecord1),
746+
new Record(keys, rawRecord2)
747+
])
748+
749+
})
750+
751+
it('should throws it when it is the event after onKeys', async () => {
752+
const keys = ['a', 'b']
753+
const expectedError = new Error('test')
754+
let observedError: Error | undefined
755+
756+
streamObserverMock.onKeys(keys)
757+
streamObserverMock.onError(expectedError)
758+
759+
const records = []
760+
761+
try {
762+
for await (const record of result) {
763+
records.push(record)
764+
}
765+
} catch (err) {
766+
observedError = err
767+
}
768+
769+
expect(observedError).toEqual(expectedError)
770+
})
771+
772+
it('should throws it when it is the first and unique event', async () => {
773+
const expectedError = new Error('test')
774+
let observedError: Error | undefined
775+
776+
streamObserverMock.onError(expectedError)
777+
778+
const records = []
779+
780+
try {
781+
for await (const record of result) {
782+
records.push(record)
783+
}
784+
} catch (err) {
785+
observedError = err
786+
}
787+
788+
expect(observedError).toEqual(expectedError)
789+
})
790+
})
696791
})
697792
})
698793

0 commit comments

Comments
 (0)