Skip to content

Commit 1f30b54

Browse files
authored
incremental publisher should handle all response building (#3930)
extracted from #3929 the publisher itself can determine whether to return a single result or the initial result + stream the only desired change is to replace the following code block with the below: [FROM:](https://github.com/graphql/graphql-js/blob/fae5da500bad94c39a7ecd77a4c4361b58d6d2da/src/execution/execute.ts#L293-L340) ```ts const incrementalPublisher = exeContext.incrementalPublisher; const initialResultRecord = incrementalPublisher.prepareInitialResultRecord(); try { const result = executeOperation(exeContext, initialResultRecord); if (isPromise(result)) { return result.then( (data) => { const errors = incrementalPublisher.getInitialErrors(initialResultRecord); const initialResult = buildResponse(data, errors); incrementalPublisher.publishInitial(initialResultRecord); if (incrementalPublisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, subsequentResults: incrementalPublisher.subscribe(), }; } return initialResult; }, (error) => { incrementalPublisher.addFieldError(initialResultRecord, error); const errors = incrementalPublisher.getInitialErrors(initialResultRecord); return buildResponse(null, errors); }, ); } const initialResult = buildResponse(result, initialResultRecord.errors); incrementalPublisher.publishInitial(initialResultRecord); if (incrementalPublisher.hasNext()) { return { initialResult: { ...initialResult, hasNext: true, }, subsequentResults: incrementalPublisher.subscribe(), }; } return initialResult; } catch (error) { incrementalPublisher.addFieldError(initialResultRecord, error); const errors = incrementalPublisher.getInitialErrors(initialResultRecord); return buildResponse(null, errors); } } ``` [TO:](https://github.com/yaacovCR/graphql-executor/blob/598608e8d8b23bc527dd73283b477997305afd58/src/execution/execute.ts#L234-L250): ```ts const incrementalPublisher = exeContext.incrementalPublisher; const initialResultRecord = incrementalPublisher.prepareInitialResultRecord(); try { const data = executeOperation(exeContext, initialResultRecord); if (isPromise(data)) { return data.then( (resolved) => incrementalPublisher.buildDataResponse(initialResultRecord, resolved), (error) => incrementalPublisher.buildErrorResponse(initialResultRecord, error), ); } return incrementalPublisher.buildDataResponse(initialResultRecord, data); } catch (error) { return incrementalPublisher.buildErrorResponse(initialResultRecord, error); } ``` Supporting changes are required: 1. some existing public methods no longer are required to be public and so are made private (or removed entirely!), with lint rules forcing the reordering of existing methods 2. to prevent cyclic type dependencies (not strictly necessary, but still!), types are moved from `execute.ts` to `IncrementalPublisher.ts`
1 parent aa6736a commit 1f30b54

File tree

10 files changed

+190
-215
lines changed

10 files changed

+190
-215
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 161 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,63 @@ import type {
88
GraphQLFormattedError,
99
} from '../error/GraphQLError.js';
1010

11+
/**
12+
* The result of GraphQL execution.
13+
*
14+
* - `errors` is included when any errors occurred as a non-empty array.
15+
* - `data` is the result of a successful execution of the query.
16+
* - `hasNext` is true if a future payload is expected.
17+
* - `extensions` is reserved for adding non-standard properties.
18+
* - `incremental` is a list of the results from defer/stream directives.
19+
*/
20+
export interface ExecutionResult<
21+
TData = ObjMap<unknown>,
22+
TExtensions = ObjMap<unknown>,
23+
> {
24+
errors?: ReadonlyArray<GraphQLError>;
25+
data?: TData | null;
26+
extensions?: TExtensions;
27+
}
28+
29+
export interface FormattedExecutionResult<
30+
TData = ObjMap<unknown>,
31+
TExtensions = ObjMap<unknown>,
32+
> {
33+
errors?: ReadonlyArray<GraphQLFormattedError>;
34+
data?: TData | null;
35+
extensions?: TExtensions;
36+
}
37+
38+
export interface ExperimentalIncrementalExecutionResults<
39+
TData = ObjMap<unknown>,
40+
TExtensions = ObjMap<unknown>,
41+
> {
42+
initialResult: InitialIncrementalExecutionResult<TData, TExtensions>;
43+
subsequentResults: AsyncGenerator<
44+
SubsequentIncrementalExecutionResult<TData, TExtensions>,
45+
void,
46+
void
47+
>;
48+
}
49+
50+
export interface InitialIncrementalExecutionResult<
51+
TData = ObjMap<unknown>,
52+
TExtensions = ObjMap<unknown>,
53+
> extends ExecutionResult<TData, TExtensions> {
54+
hasNext: boolean;
55+
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
56+
extensions?: TExtensions;
57+
}
58+
59+
export interface FormattedInitialIncrementalExecutionResult<
60+
TData = ObjMap<unknown>,
61+
TExtensions = ObjMap<unknown>,
62+
> extends FormattedExecutionResult<TData, TExtensions> {
63+
hasNext: boolean;
64+
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
65+
extensions?: TExtensions;
66+
}
67+
1168
export interface SubsequentIncrementalExecutionResult<
1269
TData = ObjMap<unknown>,
1370
TExtensions = ObjMap<unknown>,
@@ -113,86 +170,6 @@ export class IncrementalPublisher {
113170
this._reset();
114171
}
115172

116-
hasNext(): boolean {
117-
return this._pending.size > 0;
118-
}
119-
120-
subscribe(): AsyncGenerator<
121-
SubsequentIncrementalExecutionResult,
122-
void,
123-
void
124-
> {
125-
let isDone = false;
126-
127-
const _next = async (): Promise<
128-
IteratorResult<SubsequentIncrementalExecutionResult, void>
129-
> => {
130-
// eslint-disable-next-line no-constant-condition
131-
while (true) {
132-
if (isDone) {
133-
return { value: undefined, done: true };
134-
}
135-
136-
for (const item of this._released) {
137-
this._pending.delete(item);
138-
}
139-
const released = this._released;
140-
this._released = new Set();
141-
142-
const result = this._getIncrementalResult(released);
143-
144-
if (!this.hasNext()) {
145-
isDone = true;
146-
}
147-
148-
if (result !== undefined) {
149-
return { value: result, done: false };
150-
}
151-
152-
// eslint-disable-next-line no-await-in-loop
153-
await this._signalled;
154-
}
155-
};
156-
157-
const returnStreamIterators = async (): Promise<void> => {
158-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
159-
this._pending.forEach((incrementalDataRecord) => {
160-
if (
161-
isStreamItemsRecord(incrementalDataRecord) &&
162-
incrementalDataRecord.asyncIterator?.return
163-
) {
164-
promises.push(incrementalDataRecord.asyncIterator.return());
165-
}
166-
});
167-
await Promise.all(promises);
168-
};
169-
170-
const _return = async (): Promise<
171-
IteratorResult<SubsequentIncrementalExecutionResult, void>
172-
> => {
173-
isDone = true;
174-
await returnStreamIterators();
175-
return { value: undefined, done: true };
176-
};
177-
178-
const _throw = async (
179-
error?: unknown,
180-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
181-
isDone = true;
182-
await returnStreamIterators();
183-
return Promise.reject(error);
184-
};
185-
186-
return {
187-
[Symbol.asyncIterator]() {
188-
return this;
189-
},
190-
next: _next,
191-
return: _return,
192-
throw: _throw,
193-
};
194-
}
195-
196173
prepareInitialResultRecord(): InitialResultRecord {
197174
return {
198175
errors: [],
@@ -256,19 +233,38 @@ export class IncrementalPublisher {
256233
incrementalDataRecord.errors.push(error);
257234
}
258235

259-
publishInitial(initialResult: InitialResultRecord) {
260-
for (const child of initialResult.children) {
236+
buildDataResponse(
237+
initialResultRecord: InitialResultRecord,
238+
data: ObjMap<unknown> | null,
239+
): ExecutionResult | ExperimentalIncrementalExecutionResults {
240+
for (const child of initialResultRecord.children) {
261241
if (child.filtered) {
262242
continue;
263243
}
264244
this._publish(child);
265245
}
246+
247+
const errors = initialResultRecord.errors;
248+
const initialResult = errors.length === 0 ? { data } : { errors, data };
249+
if (this._pending.size > 0) {
250+
return {
251+
initialResult: {
252+
...initialResult,
253+
hasNext: true,
254+
},
255+
subsequentResults: this._subscribe(),
256+
};
257+
}
258+
return initialResult;
266259
}
267260

268-
getInitialErrors(
269-
initialResult: InitialResultRecord,
270-
): ReadonlyArray<GraphQLError> {
271-
return initialResult.errors;
261+
buildErrorResponse(
262+
initialResultRecord: InitialResultRecord,
263+
error: GraphQLError,
264+
): ExecutionResult {
265+
const errors = initialResultRecord.errors;
266+
errors.push(error);
267+
return { data: null, errors };
272268
}
273269

274270
filter(nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord) {
@@ -301,6 +297,82 @@ export class IncrementalPublisher {
301297
});
302298
}
303299

300+
private _subscribe(): AsyncGenerator<
301+
SubsequentIncrementalExecutionResult,
302+
void,
303+
void
304+
> {
305+
let isDone = false;
306+
307+
const _next = async (): Promise<
308+
IteratorResult<SubsequentIncrementalExecutionResult, void>
309+
> => {
310+
// eslint-disable-next-line no-constant-condition
311+
while (true) {
312+
if (isDone) {
313+
return { value: undefined, done: true };
314+
}
315+
316+
for (const item of this._released) {
317+
this._pending.delete(item);
318+
}
319+
const released = this._released;
320+
this._released = new Set();
321+
322+
const result = this._getIncrementalResult(released);
323+
324+
if (this._pending.size === 0) {
325+
isDone = true;
326+
}
327+
328+
if (result !== undefined) {
329+
return { value: result, done: false };
330+
}
331+
332+
// eslint-disable-next-line no-await-in-loop
333+
await this._signalled;
334+
}
335+
};
336+
337+
const returnStreamIterators = async (): Promise<void> => {
338+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
339+
this._pending.forEach((incrementalDataRecord) => {
340+
if (
341+
isStreamItemsRecord(incrementalDataRecord) &&
342+
incrementalDataRecord.asyncIterator?.return
343+
) {
344+
promises.push(incrementalDataRecord.asyncIterator.return());
345+
}
346+
});
347+
await Promise.all(promises);
348+
};
349+
350+
const _return = async (): Promise<
351+
IteratorResult<SubsequentIncrementalExecutionResult, void>
352+
> => {
353+
isDone = true;
354+
await returnStreamIterators();
355+
return { value: undefined, done: true };
356+
};
357+
358+
const _throw = async (
359+
error?: unknown,
360+
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
361+
isDone = true;
362+
await returnStreamIterators();
363+
return Promise.reject(error);
364+
};
365+
366+
return {
367+
[Symbol.asyncIterator]() {
368+
return this;
369+
},
370+
next: _next,
371+
return: _return,
372+
throw: _throw,
373+
};
374+
}
375+
304376
private _trigger() {
305377
this._resolve();
306378
this._reset();
@@ -368,9 +440,10 @@ export class IncrementalPublisher {
368440
incrementalResults.push(incrementalResult);
369441
}
370442

443+
const hasNext = this._pending.size > 0;
371444
return incrementalResults.length
372-
? { incremental: incrementalResults, hasNext: this.hasNext() }
373-
: encounteredCompletedAsyncIterator && !this.hasNext()
445+
? { incremental: incrementalResults, hasNext }
446+
: encounteredCompletedAsyncIterator && !hasNext
374447
? { hasNext: false }
375448
: undefined;
376449
}

src/execution/__tests__/defer-test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ import {
1616
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1717
import { GraphQLSchema } from '../../type/schema.js';
1818

19-
import type { InitialIncrementalExecutionResult } from '../execute.js';
2019
import { execute, experimentalExecuteIncrementally } from '../execute.js';
21-
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
20+
import type {
21+
InitialIncrementalExecutionResult,
22+
SubsequentIncrementalExecutionResult,
23+
} from '../IncrementalPublisher.js';
2224

2325
const friendType = new GraphQLObjectType({
2426
fields: {

src/execution/__tests__/lists-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import { GraphQLSchema } from '../../type/schema.js';
1818

1919
import { buildSchema } from '../../utilities/buildASTSchema.js';
2020

21-
import type { ExecutionResult } from '../execute.js';
2221
import { execute, executeSync } from '../execute.js';
22+
import type { ExecutionResult } from '../IncrementalPublisher.js';
2323

2424
describe('Execute: Accepts any iterable as list value', () => {
2525
function complete(rootValue: unknown) {

src/execution/__tests__/nonnull-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import { GraphQLSchema } from '../../type/schema.js';
1313

1414
import { buildSchema } from '../../utilities/buildASTSchema.js';
1515

16-
import type { ExecutionResult } from '../execute.js';
1716
import { execute, executeSync } from '../execute.js';
17+
import type { ExecutionResult } from '../IncrementalPublisher.js';
1818

1919
const syncError = new Error('sync');
2020
const syncNonNullError = new Error('syncNonNull');

src/execution/__tests__/oneof-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import { parse } from '../../language/parser.js';
66

77
import { buildSchema } from '../../utilities/buildASTSchema.js';
88

9-
import type { ExecutionResult } from '../execute.js';
109
import { execute } from '../execute.js';
10+
import type { ExecutionResult } from '../IncrementalPublisher.js';
1111

1212
const schema = buildSchema(`
1313
type Query {

src/execution/__tests__/stream-test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ import {
1717
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1818
import { GraphQLSchema } from '../../type/schema.js';
1919

20-
import type { InitialIncrementalExecutionResult } from '../execute.js';
2120
import { experimentalExecuteIncrementally } from '../execute.js';
22-
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
21+
import type {
22+
InitialIncrementalExecutionResult,
23+
SubsequentIncrementalExecutionResult,
24+
} from '../IncrementalPublisher.js';
2325

2426
const friendType = new GraphQLObjectType({
2527
fields: {

src/execution/__tests__/subscribe-test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import {
2020
} from '../../type/scalars.js';
2121
import { GraphQLSchema } from '../../type/schema.js';
2222

23-
import type { ExecutionArgs, ExecutionResult } from '../execute.js';
23+
import type { ExecutionArgs } from '../execute.js';
2424
import { createSourceEventStream, subscribe } from '../execute.js';
25+
import type { ExecutionResult } from '../IncrementalPublisher.js';
2526

2627
import { SimplePubSub } from './simplePubSub.js';
2728

0 commit comments

Comments
 (0)