Skip to content

Commit 65a5ced

Browse files
committed
add pending notifications
1 parent 0dc7172 commit 65a5ced

File tree

4 files changed

+190
-2
lines changed

4 files changed

+190
-2
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type {
1111
import type { GroupedFieldSet } from './collectFields.js';
1212

1313
interface IncrementalUpdate<TData = unknown, TExtensions = ObjMap<unknown>> {
14+
pending: ReadonlyArray<PendingResult>;
1415
incremental: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
1516
completed: ReadonlyArray<CompletedResult>;
1617
}
@@ -59,6 +60,7 @@ export interface InitialIncrementalExecutionResult<
5960
TExtensions = ObjMap<unknown>,
6061
> extends ExecutionResult<TData, TExtensions> {
6162
data: TData;
63+
pending: ReadonlyArray<PendingResult>;
6264
hasNext: true;
6365
extensions?: TExtensions;
6466
}
@@ -68,6 +70,7 @@ export interface FormattedInitialIncrementalExecutionResult<
6870
TExtensions = ObjMap<unknown>,
6971
> extends FormattedExecutionResult<TData, TExtensions> {
7072
data: TData;
73+
pending: ReadonlyArray<PendingResult>;
7174
hasNext: boolean;
7275
extensions?: TExtensions;
7376
}
@@ -85,6 +88,7 @@ export interface FormattedSubsequentIncrementalExecutionResult<
8588
TExtensions = ObjMap<unknown>,
8689
> {
8790
hasNext: boolean;
91+
pending?: ReadonlyArray<PendingResult>;
8892
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
8993
completed?: ReadonlyArray<FormattedCompletedResult>;
9094
extensions?: TExtensions;
@@ -141,6 +145,11 @@ export type FormattedIncrementalResult<
141145
| FormattedIncrementalDeferResult<TData, TExtensions>
142146
| FormattedIncrementalStreamResult<TData, TExtensions>;
143147

148+
export interface PendingResult {
149+
path: ReadonlyArray<string | number>;
150+
label?: string;
151+
}
152+
144153
export interface CompletedResult {
145154
path: ReadonlyArray<string | number>;
146155
label?: string;
@@ -296,10 +305,20 @@ export class IncrementalPublisher {
296305

297306
const errors = initialResultRecord.errors;
298307
const initialResult = errors.length === 0 ? { data } : { errors, data };
299-
if (this._pending.size > 0) {
308+
const pending = this._pending;
309+
if (pending.size > 0) {
310+
const pendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
311+
for (const subsequentResultRecord of pending) {
312+
const pendingSource = isStreamItemsRecord(subsequentResultRecord)
313+
? subsequentResultRecord.streamRecord
314+
: subsequentResultRecord;
315+
pendingSources.add(pendingSource);
316+
}
317+
300318
return {
301319
initialResult: {
302320
...initialResult,
321+
pending: this.pendingSourcesToResults(pendingSources),
303322
hasNext: true,
304323
},
305324
subsequentResults: this._subscribe(),
@@ -347,6 +366,23 @@ export class IncrementalPublisher {
347366
});
348367
}
349368

369+
pendingSourcesToResults(
370+
pendingSources: ReadonlySet<DeferredFragmentRecord | StreamRecord>,
371+
): Array<PendingResult> {
372+
const pendingResults: Array<PendingResult> = [];
373+
for (const pendingSource of pendingSources) {
374+
pendingSource.pendingSent = true;
375+
const pendingResult: PendingResult = {
376+
path: pendingSource.path,
377+
};
378+
if (pendingSource.label !== undefined) {
379+
pendingResult.label = pendingSource.label;
380+
}
381+
pendingResults.push(pendingResult);
382+
}
383+
return pendingResults;
384+
}
385+
350386
private _subscribe(): AsyncGenerator<
351387
SubsequentIncrementalExecutionResult,
352388
void,
@@ -461,14 +497,18 @@ export class IncrementalPublisher {
461497
private _getIncrementalResult(
462498
completedRecords: ReadonlySet<SubsequentResultRecord>,
463499
): SubsequentIncrementalExecutionResult | undefined {
464-
const { incremental, completed } = this._processPending(completedRecords);
500+
const { pending, incremental, completed } =
501+
this._processPending(completedRecords);
465502

466503
const hasNext = this._pending.size > 0;
467504
if (incremental.length === 0 && completed.length === 0 && hasNext) {
468505
return undefined;
469506
}
470507

471508
const result: SubsequentIncrementalExecutionResult = { hasNext };
509+
if (pending.length) {
510+
result.pending = pending;
511+
}
472512
if (incremental.length) {
473513
result.incremental = incremental;
474514
}
@@ -482,17 +522,25 @@ export class IncrementalPublisher {
482522
private _processPending(
483523
completedRecords: ReadonlySet<SubsequentResultRecord>,
484524
): IncrementalUpdate {
525+
const newPendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
485526
const incrementalResults: Array<IncrementalResult> = [];
486527
const completedResults: Array<CompletedResult> = [];
487528
for (const subsequentResultRecord of completedRecords) {
488529
for (const child of subsequentResultRecord.children) {
489530
if (child.filtered) {
490531
continue;
491532
}
533+
const pendingSource = isStreamItemsRecord(child)
534+
? child.streamRecord
535+
: child;
536+
if (!pendingSource.pendingSent) {
537+
newPendingSources.add(pendingSource);
538+
}
492539
this._publish(child);
493540
}
494541
if (isStreamItemsRecord(subsequentResultRecord)) {
495542
if (subsequentResultRecord.isFinalRecord) {
543+
newPendingSources.delete(subsequentResultRecord.streamRecord);
496544
completedResults.push(
497545
this._completedRecordToResult(subsequentResultRecord.streamRecord),
498546
);
@@ -513,6 +561,7 @@ export class IncrementalPublisher {
513561
}
514562
incrementalResults.push(incrementalResult);
515563
} else {
564+
newPendingSources.delete(subsequentResultRecord);
516565
completedResults.push(
517566
this._completedRecordToResult(subsequentResultRecord),
518567
);
@@ -537,6 +586,7 @@ export class IncrementalPublisher {
537586
}
538587

539588
return {
589+
pending: this.pendingSourcesToResults(newPendingSources),
540590
incremental: incrementalResults,
541591
completed: completedResults,
542592
};
@@ -690,6 +740,7 @@ export class DeferredFragmentRecord {
690740
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
691741
errors: Array<GraphQLError>;
692742
filtered: boolean;
743+
pendingSent?: boolean;
693744
_pending: Set<DeferredGroupedFieldSetRecord>;
694745

695746
constructor(opts: { path: Path | undefined; label: string | undefined }) {
@@ -709,6 +760,7 @@ export class StreamRecord {
709760
path: ReadonlyArray<string | number>;
710761
errors: Array<GraphQLError>;
711762
earlyReturn?: (() => Promise<unknown>) | undefined;
763+
pendingSent?: boolean;
712764
constructor(opts: {
713765
label: string | undefined;
714766
path: Path;

0 commit comments

Comments
 (0)