Skip to content

Commit 3576a06

Browse files
committed
add pending notifications
1 parent 79f8d9f commit 3576a06

File tree

5 files changed

+211
-10
lines changed

5 files changed

+211
-10
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type {
1717
import type { StreamUsage } from './execute.js';
1818

1919
interface IncrementalUpdate<TData = unknown, TExtensions = ObjMap<unknown>> {
20+
pending: ReadonlyArray<PendingResult>;
2021
incremental: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
2122
completed: ReadonlyArray<CompletedResult>;
2223
}
@@ -34,6 +35,7 @@ export interface FormattedSubsequentIncrementalExecutionResult<
3435
TExtensions = ObjMap<unknown>,
3536
> {
3637
hasNext: boolean;
38+
pending?: ReadonlyArray<PendingResult>;
3739
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
3840
completed?: ReadonlyArray<FormattedCompletedResult>;
3941
extensions?: TExtensions;
@@ -90,6 +92,11 @@ export type FormattedIncrementalResult<
9092
| FormattedIncrementalDeferResult<TData, TExtensions>
9193
| FormattedIncrementalStreamResult<TData, TExtensions>;
9294

95+
export interface PendingResult {
96+
path: ReadonlyArray<string | number>;
97+
label?: string;
98+
}
99+
93100
export interface CompletedResult {
94101
path: ReadonlyArray<string | number>;
95102
label?: string;
@@ -148,6 +155,10 @@ export class IncrementalPublisher {
148155
this._reset();
149156
}
150157

158+
getPending(): ReadonlySet<SubsequentResultRecord> {
159+
return this._pending;
160+
}
161+
151162
hasNext(): boolean {
152163
return this._pending.size > 0;
153164
}
@@ -419,6 +430,23 @@ export class IncrementalPublisher {
419430
incrementalDataRecord.errors.push(error);
420431
}
421432

433+
pendingSourcesToResults(
434+
pendingSources: ReadonlySet<DeferredFragmentRecord | StreamRecord>,
435+
): Array<PendingResult> {
436+
const pendingResults: Array<PendingResult> = [];
437+
for (const pendingSource of pendingSources) {
438+
pendingSource.pendingSent = true;
439+
const pendingResult: PendingResult = {
440+
path: pendingSource.path,
441+
};
442+
if (pendingSource.label !== undefined) {
443+
pendingResult.label = pendingSource.label;
444+
}
445+
pendingResults.push(pendingResult);
446+
}
447+
return pendingResults;
448+
}
449+
422450
publishInitial(): void {
423451
this._initialResult.isCompleted = true;
424452
for (const child of this._initialResult.children) {
@@ -507,14 +535,18 @@ export class IncrementalPublisher {
507535
private _getIncrementalResult(
508536
completedRecords: ReadonlySet<SubsequentResultRecord>,
509537
): SubsequentIncrementalExecutionResult | undefined {
510-
const { incremental, completed } = this._processPending(completedRecords);
538+
const { pending, incremental, completed } =
539+
this._processPending(completedRecords);
511540

512541
const hasNext = this.hasNext();
513542
if (incremental.length === 0 && completed.length === 0 && hasNext) {
514543
return undefined;
515544
}
516545

517546
const result: SubsequentIncrementalExecutionResult = { hasNext };
547+
if (pending.length) {
548+
result.pending = pending;
549+
}
518550
if (incremental.length) {
519551
result.incremental = incremental;
520552
}
@@ -528,16 +560,24 @@ export class IncrementalPublisher {
528560
private _processPending(
529561
completedRecords: ReadonlySet<SubsequentResultRecord>,
530562
): IncrementalUpdate {
563+
const newPendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
531564
const incrementalResults: Array<IncrementalResult> = [];
532565
const completedResults: Array<CompletedResult> = [];
533566
for (const subsequentResultRecord of completedRecords) {
534567
for (const child of subsequentResultRecord.children) {
568+
const pendingSource = isStreamItemsRecord(child)
569+
? child.streamRecord
570+
: child;
571+
if (!pendingSource.pendingSent) {
572+
newPendingSources.add(pendingSource);
573+
}
535574
this._publish(child);
536575
}
537576
if (isStreamItemsRecord(subsequentResultRecord)) {
538577
if (!subsequentResultRecord.sent) {
539578
subsequentResultRecord.sent = true;
540579
if (subsequentResultRecord.isFinalRecord) {
580+
newPendingSources.delete(subsequentResultRecord.streamRecord);
541581
completedResults.push(
542582
this._completedRecordToResult(
543583
subsequentResultRecord.streamRecord,
@@ -561,6 +601,7 @@ export class IncrementalPublisher {
561601
incrementalResults.push(incrementalResult);
562602
}
563603
} else {
604+
newPendingSources.delete(subsequentResultRecord);
564605
completedResults.push(
565606
this._completedRecordToResult(subsequentResultRecord),
566607
);
@@ -585,6 +626,7 @@ export class IncrementalPublisher {
585626
}
586627

587628
return {
629+
pending: this.pendingSourcesToResults(newPendingSources),
588630
incremental: incrementalResults,
589631
completed: completedResults,
590632
};
@@ -738,6 +780,7 @@ export class DeferredFragmentRecord {
738780
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
739781
errors: Array<GraphQLError>;
740782
isCompleted: boolean;
783+
pendingSent?: boolean;
741784
_pending: Set<DeferredGroupedFieldSetRecord>;
742785

743786
constructor(opts: {
@@ -762,6 +805,7 @@ export class StreamRecord {
762805
path: ReadonlyArray<string | number>;
763806
errors: Array<GraphQLError>;
764807
asyncIterator?: AsyncIterator<unknown> | undefined;
808+
pendingSent?: boolean;
765809
constructor(opts: {
766810
label: string | undefined;
767811
path: Path;
@@ -807,7 +851,7 @@ export type IncrementalDataRecord =
807851
| DeferredGroupedFieldSetRecord
808852
| StreamItemsRecord;
809853

810-
type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord;
854+
export type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord;
811855

812856
function getSubsequentResultRecords(
813857
incrementalDataRecord: IncrementalDataRecord | undefined,
@@ -823,7 +867,7 @@ function getSubsequentResultRecords(
823867
return incrementalDataRecord.deferredFragmentRecords;
824868
}
825869

826-
function isStreamItemsRecord(
870+
export function isStreamItemsRecord(
827871
subsequentResultRecord: unknown,
828872
): subsequentResultRecord is StreamItemsRecord {
829873
return subsequentResultRecord instanceof StreamItemsRecord;

0 commit comments

Comments
 (0)