Skip to content

Commit 6473e6c

Browse files
committed
move publishing code into separate file
1 parent d22d32d commit 6473e6c

File tree

5 files changed

+429
-343
lines changed

5 files changed

+429
-343
lines changed

src/execution/Publisher.ts

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
import type { ObjMap } from '../jsutils/ObjMap.js';
2+
import type { Path } from '../jsutils/Path.js';
3+
import { pathToArray } from '../jsutils/Path.js';
4+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
5+
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
6+
7+
import type {
8+
GraphQLError,
9+
GraphQLFormattedError,
10+
} from '../error/GraphQLError.js';
11+
12+
export interface SubsequentIncrementalExecutionResult<
13+
TData = ObjMap<unknown>,
14+
TExtensions = ObjMap<unknown>,
15+
> {
16+
hasNext: boolean;
17+
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
18+
extensions?: TExtensions;
19+
}
20+
21+
export interface FormattedSubsequentIncrementalExecutionResult<
22+
TData = ObjMap<unknown>,
23+
TExtensions = ObjMap<unknown>,
24+
> {
25+
hasNext: boolean;
26+
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
27+
extensions?: TExtensions;
28+
}
29+
30+
export interface IncrementalDeferResult<
31+
TData = ObjMap<unknown>,
32+
TExtensions = ObjMap<unknown>,
33+
> {
34+
errors?: ReadonlyArray<GraphQLError>;
35+
data?: TData | null;
36+
path?: ReadonlyArray<string | number>;
37+
label?: string;
38+
extensions?: TExtensions;
39+
}
40+
41+
export interface FormattedIncrementalDeferResult<
42+
TData = ObjMap<unknown>,
43+
TExtensions = ObjMap<unknown>,
44+
> {
45+
errors?: ReadonlyArray<GraphQLFormattedError>;
46+
data?: TData | null;
47+
path?: ReadonlyArray<string | number>;
48+
label?: string;
49+
extensions?: TExtensions;
50+
}
51+
52+
export interface IncrementalStreamResult<
53+
TData = Array<unknown>,
54+
TExtensions = ObjMap<unknown>,
55+
> {
56+
errors?: ReadonlyArray<GraphQLError>;
57+
items?: TData | null;
58+
path?: ReadonlyArray<string | number>;
59+
label?: string;
60+
extensions?: TExtensions;
61+
}
62+
63+
export interface FormattedIncrementalStreamResult<
64+
TData = Array<unknown>,
65+
TExtensions = ObjMap<unknown>,
66+
> {
67+
errors?: ReadonlyArray<GraphQLFormattedError>;
68+
items?: TData | null;
69+
path?: ReadonlyArray<string | number>;
70+
label?: string;
71+
extensions?: TExtensions;
72+
}
73+
74+
export type IncrementalResult<
75+
TData = ObjMap<unknown>,
76+
TExtensions = ObjMap<unknown>,
77+
> =
78+
| IncrementalDeferResult<TData, TExtensions>
79+
| IncrementalStreamResult<TData, TExtensions>;
80+
81+
export type FormattedIncrementalResult<
82+
TData = ObjMap<unknown>,
83+
TExtensions = ObjMap<unknown>,
84+
> =
85+
| FormattedIncrementalDeferResult<TData, TExtensions>
86+
| FormattedIncrementalStreamResult<TData, TExtensions>;
87+
88+
export function filterSubsequentPayloads(
89+
subsequentPayloads: Set<IncrementalDataRecord>,
90+
nullPath: Path,
91+
currentIncrementalDataRecord: IncrementalDataRecord | undefined,
92+
): void {
93+
const nullPathArray = pathToArray(nullPath);
94+
subsequentPayloads.forEach((incrementalDataRecord) => {
95+
if (incrementalDataRecord === currentIncrementalDataRecord) {
96+
// don't remove payload from where error originates
97+
return;
98+
}
99+
for (let i = 0; i < nullPathArray.length; i++) {
100+
if (incrementalDataRecord.path[i] !== nullPathArray[i]) {
101+
// incrementalDataRecord points to a path unaffected by this payload
102+
return;
103+
}
104+
}
105+
// incrementalDataRecord path points to nulled error field
106+
if (
107+
isStreamItemsRecord(incrementalDataRecord) &&
108+
incrementalDataRecord.asyncIterator?.return
109+
) {
110+
incrementalDataRecord.asyncIterator.return().catch(() => {
111+
// ignore error
112+
});
113+
}
114+
subsequentPayloads.delete(incrementalDataRecord);
115+
});
116+
}
117+
118+
function getCompletedIncrementalResults(
119+
subsequentPayloads: Set<IncrementalDataRecord>,
120+
): Array<IncrementalResult> {
121+
const incrementalResults: Array<IncrementalResult> = [];
122+
for (const incrementalDataRecord of subsequentPayloads) {
123+
const incrementalResult: IncrementalResult = {};
124+
if (!incrementalDataRecord.isCompleted) {
125+
continue;
126+
}
127+
subsequentPayloads.delete(incrementalDataRecord);
128+
if (isStreamItemsRecord(incrementalDataRecord)) {
129+
const items = incrementalDataRecord.items;
130+
if (incrementalDataRecord.isCompletedAsyncIterator) {
131+
// async iterable resolver just finished but there may be pending payloads
132+
continue;
133+
}
134+
(incrementalResult as IncrementalStreamResult).items = items;
135+
} else {
136+
const data = incrementalDataRecord.data;
137+
(incrementalResult as IncrementalDeferResult).data = data ?? null;
138+
}
139+
140+
incrementalResult.path = incrementalDataRecord.path;
141+
if (incrementalDataRecord.label != null) {
142+
incrementalResult.label = incrementalDataRecord.label;
143+
}
144+
if (incrementalDataRecord.errors.length > 0) {
145+
incrementalResult.errors = incrementalDataRecord.errors;
146+
}
147+
incrementalResults.push(incrementalResult);
148+
}
149+
return incrementalResults;
150+
}
151+
152+
export function yieldSubsequentPayloads(
153+
subsequentPayloads: Set<IncrementalDataRecord>,
154+
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
155+
let isDone = false;
156+
157+
async function next(): Promise<
158+
IteratorResult<SubsequentIncrementalExecutionResult, void>
159+
> {
160+
if (isDone) {
161+
return { value: undefined, done: true };
162+
}
163+
164+
await Promise.race(Array.from(subsequentPayloads).map((p) => p.promise));
165+
166+
if (isDone) {
167+
// a different call to next has exhausted all payloads
168+
return { value: undefined, done: true };
169+
}
170+
171+
const incremental = getCompletedIncrementalResults(subsequentPayloads);
172+
const hasNext = subsequentPayloads.size > 0;
173+
174+
if (!incremental.length && hasNext) {
175+
return next();
176+
}
177+
178+
if (!hasNext) {
179+
isDone = true;
180+
}
181+
182+
return {
183+
value: incremental.length ? { incremental, hasNext } : { hasNext },
184+
done: false,
185+
};
186+
}
187+
188+
function returnStreamIterators() {
189+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
190+
subsequentPayloads.forEach((incrementalDataRecord) => {
191+
if (
192+
isStreamItemsRecord(incrementalDataRecord) &&
193+
incrementalDataRecord.asyncIterator?.return
194+
) {
195+
promises.push(incrementalDataRecord.asyncIterator.return());
196+
}
197+
});
198+
return Promise.all(promises);
199+
}
200+
201+
return {
202+
[Symbol.asyncIterator]() {
203+
return this;
204+
},
205+
next,
206+
async return(): Promise<
207+
IteratorResult<SubsequentIncrementalExecutionResult, void>
208+
> {
209+
await returnStreamIterators();
210+
isDone = true;
211+
return { value: undefined, done: true };
212+
},
213+
async throw(
214+
error?: unknown,
215+
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
216+
await returnStreamIterators();
217+
isDone = true;
218+
return Promise.reject(error);
219+
},
220+
};
221+
}
222+
223+
/** @internal */
224+
export class DeferredFragmentRecord {
225+
type: 'defer';
226+
errors: Array<GraphQLError>;
227+
label: string | undefined;
228+
path: Array<string | number>;
229+
promise: Promise<void>;
230+
data: ObjMap<unknown> | null;
231+
parentContext: IncrementalDataRecord | undefined;
232+
isCompleted: boolean;
233+
_subsequentPayloads: Set<IncrementalDataRecord>;
234+
_resolve?: (arg: PromiseOrValue<ObjMap<unknown> | null>) => void;
235+
constructor(opts: {
236+
label: string | undefined;
237+
path: Path | undefined;
238+
parentContext: IncrementalDataRecord | undefined;
239+
subsequentPayloads: Set<IncrementalDataRecord>;
240+
}) {
241+
this.type = 'defer';
242+
this.label = opts.label;
243+
this.path = pathToArray(opts.path);
244+
this.parentContext = opts.parentContext;
245+
this.errors = [];
246+
this._subsequentPayloads = opts.subsequentPayloads;
247+
this._subsequentPayloads.add(this);
248+
this.isCompleted = false;
249+
this.data = null;
250+
const { promise, resolve } = promiseWithResolvers<ObjMap<unknown> | null>();
251+
this._resolve = resolve;
252+
this.promise = promise.then((data) => {
253+
this.data = data;
254+
this.isCompleted = true;
255+
});
256+
}
257+
258+
addData(data: PromiseOrValue<ObjMap<unknown> | null>) {
259+
const parentData = this.parentContext?.promise;
260+
if (parentData) {
261+
this._resolve?.(parentData.then(() => data));
262+
return;
263+
}
264+
this._resolve?.(data);
265+
}
266+
}
267+
268+
/** @internal */
269+
export class StreamItemsRecord {
270+
type: 'stream';
271+
errors: Array<GraphQLError>;
272+
label: string | undefined;
273+
path: Array<string | number>;
274+
items: Array<unknown> | null;
275+
promise: Promise<void>;
276+
parentContext: IncrementalDataRecord | undefined;
277+
asyncIterator: AsyncIterator<unknown> | undefined;
278+
isCompletedAsyncIterator?: boolean;
279+
isCompleted: boolean;
280+
_subsequentPayloads: Set<IncrementalDataRecord>;
281+
_resolve?: (arg: PromiseOrValue<Array<unknown> | null>) => void;
282+
constructor(opts: {
283+
label: string | undefined;
284+
path: Path | undefined;
285+
asyncIterator?: AsyncIterator<unknown>;
286+
parentContext: IncrementalDataRecord | undefined;
287+
subsequentPayloads: Set<IncrementalDataRecord>;
288+
}) {
289+
this.type = 'stream';
290+
this.items = null;
291+
this.label = opts.label;
292+
this.path = pathToArray(opts.path);
293+
this.parentContext = opts.parentContext;
294+
this.asyncIterator = opts.asyncIterator;
295+
this.errors = [];
296+
this._subsequentPayloads = opts.subsequentPayloads;
297+
this._subsequentPayloads.add(this);
298+
this.isCompleted = false;
299+
this.items = null;
300+
const { promise, resolve } = promiseWithResolvers<Array<unknown> | null>();
301+
this._resolve = resolve;
302+
this.promise = promise.then((items) => {
303+
this.items = items;
304+
this.isCompleted = true;
305+
});
306+
}
307+
308+
addItems(items: PromiseOrValue<Array<unknown> | null>) {
309+
const parentData = this.parentContext?.promise;
310+
if (parentData) {
311+
this._resolve?.(parentData.then(() => items));
312+
return;
313+
}
314+
this._resolve?.(items);
315+
}
316+
317+
setIsCompletedAsyncIterator() {
318+
this.isCompletedAsyncIterator = true;
319+
}
320+
}
321+
322+
export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord;
323+
324+
function isStreamItemsRecord(
325+
incrementalDataRecord: IncrementalDataRecord,
326+
): incrementalDataRecord is StreamItemsRecord {
327+
return incrementalDataRecord.type === 'stream';
328+
}

src/execution/__tests__/defer-test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,9 @@ import {
1616
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1717
import { GraphQLSchema } from '../../type/schema.js';
1818

19-
import type {
20-
InitialIncrementalExecutionResult,
21-
SubsequentIncrementalExecutionResult,
22-
} from '../execute.js';
19+
import type { InitialIncrementalExecutionResult } from '../execute.js';
2320
import { execute, experimentalExecuteIncrementally } from '../execute.js';
21+
import type { SubsequentIncrementalExecutionResult } from '../Publisher.js';
2422

2523
const friendType = new GraphQLObjectType({
2624
fields: {

src/execution/__tests__/stream-test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@ import {
1717
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1818
import { GraphQLSchema } from '../../type/schema.js';
1919

20-
import type {
21-
InitialIncrementalExecutionResult,
22-
SubsequentIncrementalExecutionResult,
23-
} from '../execute.js';
20+
import type { InitialIncrementalExecutionResult } from '../execute.js';
2421
import { experimentalExecuteIncrementally } from '../execute.js';
22+
import type { SubsequentIncrementalExecutionResult } from '../Publisher.js';
2523

2624
const friendType = new GraphQLObjectType({
2725
fields: {

0 commit comments

Comments
 (0)