Skip to content

Commit 6b48f15

Browse files
authored
Introduce .peek() to the Async Iterator API (#846)
This feature allows the user peek the element on the head of the cursor without moving the cursor forward. The implementation of this feature has as side effect a new implementation `.return` method in the iterator. This new version of the method does most the same thing as the previous one, but it also cancels the stream since this method moves the cursor to the end. The `next` method also got tweaked.
1 parent 5b4f475 commit 6b48f15

File tree

4 files changed

+431
-54
lines changed

4 files changed

+431
-54
lines changed

packages/core/src/result.ts

Lines changed: 77 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import ResultSummary from './result-summary'
2121
import Record from './record'
22-
import { Query } from './types'
22+
import { Query, PeekableAsyncIterator } from './types'
2323
import { observer, util, connectionHolder } from './internal'
2424

2525
const { EMPTY_CONNECTION_HOLDER } = connectionHolder
@@ -87,25 +87,15 @@ interface ResultObserver {
8787
onError?: (error: Error) => void
8888
}
8989

90-
/**
91-
* Defines the elements in the queue result observer
92-
* @access private
93-
*/
94-
type QueuedResultElement = {
95-
done: false
96-
record: Record
97-
} | {
98-
done: true
99-
summary: ResultSummary
100-
}
10190

10291
/**
10392
* Defines a ResultObserver interface which can be used to enqueue records and dequeue
10493
* them until the result is fully received.
10594
* @access private
10695
*/
10796
interface QueuedResultObserver extends ResultObserver {
108-
dequeue (): Promise<QueuedResultElement>
97+
dequeue (): Promise<IteratorResult<Record, ResultSummary>>
98+
head (): Promise<IteratorResult<Record, ResultSummary>>
10999
get size (): number
110100
}
111101

@@ -239,35 +229,60 @@ class Result implements Promise<QueryResult> {
239229
* *Should not be combined with {@link Result#subscribe} or ${@link Result#then} functions.*
240230
*
241231
* @public
242-
* @returns {AsyncIterator<Record, ResultSummary>} The async iterator for the Results
232+
* @returns {PeekableAsyncIterator<Record, ResultSummary>} The async iterator for the Results
243233
*/
244-
async* [Symbol.asyncIterator](): AsyncIterator<Record, ResultSummary> {
245-
const queuedObserver = this._createQueuedResultObserver()
246-
247-
const status = { paused: true, firstRun: true }
248-
249-
const streaming: observer.ResultStreamObserver | null =
250-
// the error will be send to the onError callback
251-
await this._subscribe(queuedObserver, true).catch(() => null)
252-
253-
const controlFlow = () => {
254-
if (queuedObserver.size >= this._watermarks.high && !status.paused) {
255-
status.paused = true
256-
streaming?.pause()
257-
} else if (queuedObserver.size <= this._watermarks.low && status.paused || status.firstRun) {
258-
status.firstRun = false
259-
status.paused = false
260-
streaming?.resume()
234+
[Symbol.asyncIterator](): PeekableAsyncIterator<Record, ResultSummary> {
235+
const state: {
236+
paused: boolean,
237+
firstRun: boolean,
238+
finished: boolean,
239+
queuedObserver?: QueuedResultObserver,
240+
streaming?: observer.ResultStreamObserver,
241+
summary?: ResultSummary,
242+
} = { paused: true, firstRun: true, finished: false }
243+
244+
245+
const controlFlow = async () => {
246+
if (state.queuedObserver === undefined) {
247+
state.queuedObserver = this._createQueuedResultObserver()
248+
state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined)
249+
}
250+
if (state.queuedObserver.size >= this._watermarks.high && !state.paused) {
251+
state.paused = true
252+
state.streaming?.pause()
253+
} else if (state.queuedObserver.size <= this._watermarks.low && state.paused || state.firstRun) {
254+
state.firstRun = false
255+
state.paused = false
256+
state.streaming?.resume()
261257
}
262258
}
263259

264-
while(true) {
265-
controlFlow()
266-
const next = await queuedObserver.dequeue()
267-
if (next.done) {
268-
return next.summary
260+
return {
261+
next: async () => {
262+
if (state.finished) {
263+
return { done: true, value: state.summary! }
264+
}
265+
await controlFlow()
266+
const next = await state.queuedObserver!.dequeue()
267+
if (next.done) {
268+
state.finished = next.done
269+
state.summary = next.value
270+
}
271+
return next
272+
},
273+
return: async (value: ResultSummary) => {
274+
state.finished = true
275+
state.summary = value
276+
state.streaming?.cancel()
277+
return { done: true, value: value }
278+
},
279+
peek: async () => {
280+
if (state.finished) {
281+
return { done: true, value: state.summary! }
282+
}
283+
await controlFlow()
284+
return await state.queuedObserver!.head()
269285
}
270-
yield next.record
271286
}
272287
}
273288

@@ -453,7 +468,7 @@ class Result implements Promise<QueryResult> {
453468
reject: (arg: Error) => any | undefined
454469
}
455470

456-
function createResolvablePromise (): ResolvablePromise<QueuedResultElement> {
471+
function createResolvablePromise (): ResolvablePromise<IteratorResult<Record, ResultSummary>> {
457472
const resolvablePromise: any = {}
458473
resolvablePromise.promise = new Promise((resolve, reject) => {
459474
resolvablePromise.resolve = resolve
@@ -462,22 +477,23 @@ class Result implements Promise<QueryResult> {
462477
return resolvablePromise;
463478
}
464479

465-
type QueuedResultElementOrError = QueuedResultElement | Error
480+
type QueuedResultElementOrError = IteratorResult<Record, ResultSummary> | Error
466481

467482
function isError(elementOrError: QueuedResultElementOrError): elementOrError is Error {
468483
return elementOrError instanceof Error
469484
}
470485

471486
const buffer: QueuedResultElementOrError[] = []
472-
const promiseHolder: { resolvable: ResolvablePromise<QueuedResultElement> | null } = { resolvable: null }
473-
487+
const promiseHolder: {
488+
resolvable: ResolvablePromise<IteratorResult<Record, ResultSummary>> | null
489+
} = { resolvable: null }
474490

475491
const observer = {
476492
onNext: (record: Record) => {
477-
observer._push({ done: false, record })
493+
observer._push({ done: false, value: record })
478494
},
479495
onCompleted: (summary: ResultSummary) => {
480-
observer._push({ done: true, summary })
496+
observer._push({ done: true, value: summary })
481497
},
482498
onError: (error: Error) => {
483499
observer._push(error)
@@ -506,6 +522,24 @@ class Result implements Promise<QueryResult> {
506522
promiseHolder.resolvable = createResolvablePromise()
507523
return await promiseHolder.resolvable.promise
508524
},
525+
head: async () => {
526+
if (buffer.length > 0) {
527+
const element = buffer[0]
528+
if (isError(element)) {
529+
throw element
530+
}
531+
return element
532+
}
533+
promiseHolder.resolvable = createResolvablePromise()
534+
try {
535+
const element = await promiseHolder.resolvable.promise
536+
buffer.unshift(element)
537+
return element
538+
} catch (error) {
539+
buffer.unshift(error)
540+
throw error
541+
}
542+
},
509543
get size (): number {
510544
return buffer.length
511545
}

packages/core/src/types.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,17 @@ export interface Config {
6565
resolver?: (address: string) => string[] | Promise<string[]>
6666
userAgent?: string
6767
}
68+
69+
/**
70+
* Extension interface for {@link AsyncIterator} with peek capabilities.
71+
*
72+
* @public
73+
*/
74+
export interface PeekableAsyncIterator<T, TReturn = any, TNext = undefined> extends AsyncIterator<T, TReturn, TNext> {
75+
/**
76+
* Returns the next element in the iteration without advancing the iterator.
77+
*
78+
* @return {IteratorResult<T, TReturn} The next element in the iteration.
79+
*/
80+
peek(): Promise<IteratorResult<T, TReturn>>
81+
}

0 commit comments

Comments
 (0)