Skip to content

Commit c62cd9e

Browse files
committed
Implement support for @stream directive
# Conflicts: # src/execution/execute.ts # src/validation/index.d.ts # src/validation/index.ts
1 parent 8981544 commit c62cd9e

File tree

8 files changed

+1234
-18
lines changed

8 files changed

+1234
-18
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 700 additions & 0 deletions
Large diffs are not rendered by default.

src/execution/execute.ts

Lines changed: 243 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import {
4444
TypeMetaFieldDef,
4545
TypeNameMetaFieldDef,
4646
} from '../type/introspection';
47+
import { GraphQLStreamDirective } from '../type/directives';
4748
import {
4849
isObjectType,
4950
isAbstractType,
@@ -54,7 +55,11 @@ import {
5455

5556
import { getOperationRootType } from '../utilities/getOperationRootType';
5657

57-
import { getVariableValues, getArgumentValues } from './values';
58+
import {
59+
getVariableValues,
60+
getArgumentValues,
61+
getDirectiveValues,
62+
} from './values';
5863
import type { FieldsAndPatches, PatchFields } from './collectFields';
5964
import { collectFields } from './collectFields';
6065

@@ -136,7 +141,7 @@ export interface FormattedExecutionResult<
136141
* - `extensions` is reserved for adding non-standard properties.
137142
*/
138143
export interface ExecutionPatchResult<
139-
TData = ObjMap<unknown>,
144+
TData = ObjMap<unknown> | unknown,
140145
TExtensions = ObjMap<unknown>,
141146
> {
142147
errors?: ReadonlyArray<GraphQLError>;
@@ -148,7 +153,7 @@ export interface ExecutionPatchResult<
148153
}
149154

150155
export interface FormattedExecutionPatchResult<
151-
TData = ObjMap<unknown>,
156+
TData = ObjMap<unknown> | unknown,
152157
TExtensions = ObjMap<unknown>,
153158
> {
154159
errors?: ReadonlyArray<GraphQLFormattedError>;
@@ -762,6 +767,44 @@ function completeValue(
762767
);
763768
}
764769

770+
/**
771+
* Returns an object containing the `@stream` arguments if a field should be
772+
* streamed based on the experimental flag, stream directive present and
773+
* not disabled by the "if" argument.
774+
*/
775+
function getStreamValues(
776+
exeContext: ExecutionContext,
777+
fieldNodes: ReadonlyArray<FieldNode>,
778+
):
779+
| undefined
780+
| {
781+
initialCount?: number;
782+
label?: string;
783+
} {
784+
// validation only allows equivalent streams on multiple fields, so it is
785+
// safe to only check the first fieldNode for the stream directive
786+
const stream = getDirectiveValues(
787+
GraphQLStreamDirective,
788+
fieldNodes[0],
789+
exeContext.variableValues,
790+
);
791+
792+
if (!stream) {
793+
return;
794+
}
795+
796+
if (stream.if === false) {
797+
return;
798+
}
799+
800+
return {
801+
initialCount:
802+
// istanbul ignore next (initialCount is required number argument)
803+
typeof stream.initialCount === 'number' ? stream.initialCount : undefined,
804+
label: typeof stream.label === 'string' ? stream.label : undefined,
805+
};
806+
}
807+
765808
/**
766809
* Complete a async iterator value by completing the result and calling
767810
* recursively until all the results are completed.
@@ -776,8 +819,28 @@ function completeAsyncIteratorValue(
776819
errors: Array<GraphQLError>,
777820
): Promise<ReadonlyArray<unknown>> {
778821
let containsPromise = false;
822+
const stream = getStreamValues(exeContext, fieldNodes);
779823
return new Promise<ReadonlyArray<unknown>>((resolve) => {
780824
function next(index: number, completedResults: Array<unknown>) {
825+
if (
826+
stream &&
827+
typeof stream.initialCount === 'number' &&
828+
index >= stream.initialCount
829+
) {
830+
exeContext.dispatcher.addAsyncIteratorValue(
831+
index,
832+
iterator,
833+
exeContext,
834+
fieldNodes,
835+
info,
836+
itemType,
837+
path,
838+
stream.label,
839+
);
840+
resolve(completedResults);
841+
return;
842+
}
843+
781844
const fieldPath = addPath(path, index, undefined);
782845
iterator.next().then(
783846
({ value, done }) => {
@@ -866,15 +929,37 @@ function completeListValue(
866929
);
867930
}
868931

932+
const stream = getStreamValues(exeContext, fieldNodes);
933+
869934
// This is specified as a simple map, however we're optimizing the path
870935
// where the list contains no Promises by avoiding creating another Promise.
871936
let containsPromise = false;
872-
const completedResults = Array.from(result, (item, index) => {
937+
const completedResults = [];
938+
let index = 0;
939+
for (const item of result) {
873940
// No need to modify the info object containing the path,
874941
// since from here on it is not ever accessed by resolver functions.
875942
const itemPath = addPath(path, index, undefined);
876943
try {
877944
let completedItem;
945+
946+
if (
947+
stream &&
948+
typeof stream.initialCount === 'number' &&
949+
index >= stream.initialCount
950+
) {
951+
exeContext.dispatcher.addValue(
952+
itemPath,
953+
item,
954+
exeContext,
955+
fieldNodes,
956+
info,
957+
itemType,
958+
stream.label,
959+
);
960+
index++;
961+
continue;
962+
}
878963
if (isPromise(item)) {
879964
completedItem = item.then((resolved) =>
880965
completeValue(
@@ -903,21 +988,25 @@ function completeListValue(
903988
containsPromise = true;
904989
// Note: we don't rely on a `catch` method, but we do expect "thenable"
905990
// to take a second callback for the error case.
906-
return completedItem.then(undefined, (rawError) => {
907-
const error = locatedError(
908-
rawError,
909-
fieldNodes,
910-
pathToArray(itemPath),
911-
);
912-
return handleFieldError(error, itemType, errors);
913-
});
991+
completedResults.push(
992+
completedItem.then(undefined, (rawError) => {
993+
const error = locatedError(
994+
rawError,
995+
fieldNodes,
996+
pathToArray(itemPath),
997+
);
998+
return handleFieldError(error, itemType, errors);
999+
}),
1000+
);
1001+
} else {
1002+
completedResults.push(completedItem);
9141003
}
915-
return completedItem;
9161004
} catch (rawError) {
9171005
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
918-
return handleFieldError(error, itemType, errors);
1006+
completedResults.push(handleFieldError(error, itemType, errors));
9191007
}
920-
});
1008+
index++;
1009+
}
9211010

9221011
return containsPromise ? Promise.all(completedResults) : completedResults;
9231012
}
@@ -1295,7 +1384,7 @@ export function getFieldDef(
12951384
*/
12961385
interface DispatcherResult {
12971386
errors?: ReadonlyArray<GraphQLError>;
1298-
data?: ObjMap<unknown> | null;
1387+
data?: ObjMap<unknown> | unknown | null;
12991388
path: ReadonlyArray<string | number>;
13001389
label?: string;
13011390
extensions?: ObjMap<unknown>;
@@ -1335,6 +1424,129 @@ export class Dispatcher {
13351424
);
13361425
}
13371426

1427+
addValue(
1428+
path: Path,
1429+
promiseOrData: PromiseOrValue<unknown>,
1430+
exeContext: ExecutionContext,
1431+
fieldNodes: ReadonlyArray<FieldNode>,
1432+
info: GraphQLResolveInfo,
1433+
itemType: GraphQLOutputType,
1434+
label?: string,
1435+
): void {
1436+
const errors: Array<GraphQLError> = [];
1437+
this._subsequentPayloads.push(
1438+
Promise.resolve(promiseOrData)
1439+
.then((resolved) =>
1440+
completeValue(
1441+
exeContext,
1442+
itemType,
1443+
fieldNodes,
1444+
info,
1445+
path,
1446+
resolved,
1447+
errors,
1448+
),
1449+
)
1450+
// Note: we don't rely on a `catch` method, but we do expect "thenable"
1451+
// to take a second callback for the error case.
1452+
.then(undefined, (rawError) => {
1453+
const error = locatedError(rawError, fieldNodes, pathToArray(path));
1454+
return handleFieldError(error, itemType, errors);
1455+
})
1456+
.then((data) => ({
1457+
value: createPatchResult(data, label, path, errors),
1458+
done: false,
1459+
})),
1460+
);
1461+
}
1462+
1463+
addAsyncIteratorValue(
1464+
initialIndex: number,
1465+
iterator: AsyncIterator<unknown>,
1466+
exeContext: ExecutionContext,
1467+
fieldNodes: ReadonlyArray<FieldNode>,
1468+
info: GraphQLResolveInfo,
1469+
itemType: GraphQLOutputType,
1470+
path?: Path,
1471+
label?: string,
1472+
): void {
1473+
const subsequentPayloads = this._subsequentPayloads;
1474+
function next(index: number) {
1475+
const fieldPath = addPath(path, index, undefined);
1476+
const patchErrors: Array<GraphQLError> = [];
1477+
subsequentPayloads.push(
1478+
iterator.next().then(
1479+
({ value: data, done }) => {
1480+
if (done) {
1481+
return { value: undefined, done: true };
1482+
}
1483+
1484+
// eslint-disable-next-line node/callback-return
1485+
next(index + 1);
1486+
1487+
try {
1488+
const completedItem = completeValue(
1489+
exeContext,
1490+
itemType,
1491+
fieldNodes,
1492+
info,
1493+
fieldPath,
1494+
data,
1495+
patchErrors,
1496+
);
1497+
1498+
if (isPromise(completedItem)) {
1499+
return completedItem.then((resolveItem) => ({
1500+
value: createPatchResult(
1501+
resolveItem,
1502+
label,
1503+
fieldPath,
1504+
patchErrors,
1505+
),
1506+
done: false,
1507+
}));
1508+
}
1509+
1510+
return {
1511+
value: createPatchResult(
1512+
completedItem,
1513+
label,
1514+
fieldPath,
1515+
patchErrors,
1516+
),
1517+
done: false,
1518+
};
1519+
} catch (rawError) {
1520+
const error = locatedError(
1521+
rawError,
1522+
fieldNodes,
1523+
pathToArray(fieldPath),
1524+
);
1525+
handleFieldError(error, itemType, patchErrors);
1526+
return {
1527+
value: createPatchResult(null, label, fieldPath, patchErrors),
1528+
done: false,
1529+
};
1530+
}
1531+
},
1532+
(rawError) => {
1533+
const error = locatedError(
1534+
rawError,
1535+
fieldNodes,
1536+
pathToArray(fieldPath),
1537+
);
1538+
handleFieldError(error, itemType, patchErrors);
1539+
return {
1540+
value: createPatchResult(null, label, fieldPath, patchErrors),
1541+
done: false,
1542+
};
1543+
},
1544+
),
1545+
);
1546+
}
1547+
next(initialIndex);
1548+
}
1549+
13381550
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
13391551
return new Promise<{
13401552
promise: Promise<IteratorResult<DispatcherResult, void>>;
@@ -1354,7 +1566,20 @@ export class Dispatcher {
13541566
);
13551567
return promise;
13561568
})
1357-
.then(({ value }) => {
1569+
.then(({ value, done }) => {
1570+
if (done && this._subsequentPayloads.length === 0) {
1571+
// async iterable resolver just finished and no more pending payloads
1572+
return {
1573+
value: {
1574+
hasNext: false,
1575+
},
1576+
done: false,
1577+
};
1578+
} else if (done) {
1579+
// async iterable resolver just finished but there are pending payloads
1580+
// return the next one
1581+
return this._race();
1582+
}
13581583
const returnValue: ExecutionPatchResult = {
13591584
...value,
13601585
hasNext: this._subsequentPayloads.length > 0,
@@ -1396,7 +1621,7 @@ export class Dispatcher {
13961621
}
13971622

13981623
function createPatchResult(
1399-
data: ObjMap<unknown> | null,
1624+
data: ObjMap<unknown> | unknown | null,
14001625
label?: string,
14011626
path?: Path,
14021627
errors?: ReadonlyArray<GraphQLError>,

0 commit comments

Comments
 (0)