Skip to content

Introduce .peek() to the Async Iterator API #846

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 77 additions & 43 deletions packages/core/src/result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import ResultSummary from './result-summary'
import Record from './record'
import { Query } from './types'
import { Query, PeekableAsyncIterator } from './types'
import { observer, util, connectionHolder } from './internal'

const { EMPTY_CONNECTION_HOLDER } = connectionHolder
Expand Down Expand Up @@ -87,25 +87,15 @@ interface ResultObserver {
onError?: (error: Error) => void
}

/**
* Defines the elements in the queue result observer
* @access private
*/
type QueuedResultElement = {
done: false
record: Record
} | {
done: true
summary: ResultSummary
}

/**
* Defines a ResultObserver interface which can be used to enqueue records and dequeue
* them until the result is fully received.
* @access private
*/
interface QueuedResultObserver extends ResultObserver {
dequeue (): Promise<QueuedResultElement>
dequeue (): Promise<IteratorResult<Record, ResultSummary>>
head (): Promise<IteratorResult<Record, ResultSummary>>
get size (): number
}

Expand Down Expand Up @@ -239,35 +229,60 @@ class Result implements Promise<QueryResult> {
* *Should not be combined with {@link Result#subscribe} or ${@link Result#then} functions.*
*
* @public
* @returns {AsyncIterator<Record, ResultSummary>} The async iterator for the Results
* @returns {PeekableAsyncIterator<Record, ResultSummary>} The async iterator for the Results
*/
async* [Symbol.asyncIterator](): AsyncIterator<Record, ResultSummary> {
const queuedObserver = this._createQueuedResultObserver()

const status = { paused: true, firstRun: true }

const streaming: observer.ResultStreamObserver | null =
// the error will be send to the onError callback
await this._subscribe(queuedObserver, true).catch(() => null)

const controlFlow = () => {
if (queuedObserver.size >= this._watermarks.high && !status.paused) {
status.paused = true
streaming?.pause()
} else if (queuedObserver.size <= this._watermarks.low && status.paused || status.firstRun) {
status.firstRun = false
status.paused = false
streaming?.resume()
[Symbol.asyncIterator](): PeekableAsyncIterator<Record, ResultSummary> {
const state: {
paused: boolean,
firstRun: boolean,
finished: boolean,
queuedObserver?: QueuedResultObserver,
streaming?: observer.ResultStreamObserver,
summary?: ResultSummary,
} = { paused: true, firstRun: true, finished: false }


const controlFlow = async () => {
if (state.queuedObserver === undefined) {
state.queuedObserver = this._createQueuedResultObserver()
state.streaming = await this._subscribe(state.queuedObserver, true).catch(() => undefined)
}
if (state.queuedObserver.size >= this._watermarks.high && !state.paused) {
state.paused = true
state.streaming?.pause()
} else if (state.queuedObserver.size <= this._watermarks.low && state.paused || state.firstRun) {
state.firstRun = false
state.paused = false
state.streaming?.resume()
}
}

while(true) {
controlFlow()
const next = await queuedObserver.dequeue()
if (next.done) {
return next.summary
return {
next: async () => {
if (state.finished) {
return { done: true, value: state.summary! }
}
await controlFlow()
const next = await state.queuedObserver!.dequeue()
if (next.done) {
state.finished = next.done
state.summary = next.value
}
return next
},
return: async (value: ResultSummary) => {
state.finished = true
state.summary = value
state.streaming?.cancel()
return { done: true, value: value }
},
peek: async () => {
if (state.finished) {
return { done: true, value: state.summary! }
}
await controlFlow()
return await state.queuedObserver!.head()
}
yield next.record
}
}

Expand Down Expand Up @@ -453,7 +468,7 @@ class Result implements Promise<QueryResult> {
reject: (arg: Error) => any | undefined
}

function createResolvablePromise (): ResolvablePromise<QueuedResultElement> {
function createResolvablePromise (): ResolvablePromise<IteratorResult<Record, ResultSummary>> {
const resolvablePromise: any = {}
resolvablePromise.promise = new Promise((resolve, reject) => {
resolvablePromise.resolve = resolve
Expand All @@ -462,22 +477,23 @@ class Result implements Promise<QueryResult> {
return resolvablePromise;
}

type QueuedResultElementOrError = QueuedResultElement | Error
type QueuedResultElementOrError = IteratorResult<Record, ResultSummary> | Error

function isError(elementOrError: QueuedResultElementOrError): elementOrError is Error {
return elementOrError instanceof Error
}

const buffer: QueuedResultElementOrError[] = []
const promiseHolder: { resolvable: ResolvablePromise<QueuedResultElement> | null } = { resolvable: null }

const promiseHolder: {
resolvable: ResolvablePromise<IteratorResult<Record, ResultSummary>> | null
} = { resolvable: null }

const observer = {
onNext: (record: Record) => {
observer._push({ done: false, record })
observer._push({ done: false, value: record })
},
onCompleted: (summary: ResultSummary) => {
observer._push({ done: true, summary })
observer._push({ done: true, value: summary })
},
onError: (error: Error) => {
observer._push(error)
Expand Down Expand Up @@ -506,6 +522,24 @@ class Result implements Promise<QueryResult> {
promiseHolder.resolvable = createResolvablePromise()
return await promiseHolder.resolvable.promise
},
head: async () => {
if (buffer.length > 0) {
const element = buffer[0]
if (isError(element)) {
throw element
}
return element
}
promiseHolder.resolvable = createResolvablePromise()
try {
const element = await promiseHolder.resolvable.promise
buffer.unshift(element)
return element
} catch (error) {
buffer.unshift(error)
throw error
}
},
get size (): number {
return buffer.length
}
Expand Down
14 changes: 14 additions & 0 deletions packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,17 @@ export interface Config {
resolver?: (address: string) => string[] | Promise<string[]>
userAgent?: string
}

/**
* Extension interface for {@link AsyncIterator} with peek capabilities.
*
* @public
*/
export interface PeekableAsyncIterator<T, TReturn = any, TNext = undefined> extends AsyncIterator<T, TReturn, TNext> {
/**
* Returns the next element in the iteration without advancing the iterator.
*
* @return {IteratorResult<T, TReturn} The next element in the iteration.
*/
peek(): Promise<IteratorResult<T, TReturn>>
}
Loading