Skip to content

Commit d43d769

Browse files
committed
Implement support for @stream directive
1 parent a66d192 commit d43d769

10 files changed

+1162
-3
lines changed

src/execution/__tests__/stream-test.js

Lines changed: 652 additions & 0 deletions
Large diffs are not rendered by default.

src/execution/execute.js

Lines changed: 215 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import {
5353
GraphQLIncludeDirective,
5454
GraphQLSkipDirective,
5555
GraphQLDeferDirective,
56+
GraphQLStreamDirective,
5657
} from '../type/directives';
5758
import {
5859
isObjectType,
@@ -704,6 +705,42 @@ function getDeferValues(
704705
};
705706
}
706707

708+
/**
709+
* Returns an object containing the @stream arguments if a field should be
710+
* streamed based on the experimental flag, stream directive present and
711+
* not disabled by the "if" argument.
712+
*/
713+
function getStreamValues(
714+
exeContext: ExecutionContext,
715+
fieldNodes: $ReadOnlyArray<FieldNode>,
716+
): void | {|
717+
initialCount?: number,
718+
label?: string,
719+
|} {
720+
// validation only allows equivalent streams on multiple fields, so it is
721+
// safe to only check the first fieldNode for the stream directive
722+
const stream = getDirectiveValues(
723+
GraphQLStreamDirective,
724+
fieldNodes[0],
725+
exeContext.variableValues,
726+
);
727+
728+
if (!stream) {
729+
return;
730+
}
731+
732+
if (stream.if === false) {
733+
return;
734+
}
735+
736+
return {
737+
initialCount:
738+
// istanbul ignore next (initialCount is required number argument)
739+
typeof stream.initialCount === 'number' ? stream.initialCount : undefined,
740+
label: typeof stream.label === 'string' ? stream.label : undefined,
741+
};
742+
}
743+
707744
/**
708745
* Determines if a fragment is applicable to the given type.
709746
*/
@@ -996,6 +1033,7 @@ function completeAsyncIteratorValue(
9961033
errors: Array<GraphQLError>,
9971034
): Promise<$ReadOnlyArray<mixed>> {
9981035
let containsPromise = false;
1036+
const stream = getStreamValues(exeContext, fieldNodes);
9991037
return new Promise((resolve) => {
10001038
function next(index, completedResults) {
10011039
const fieldPath = addPath(path, index, undefined);
@@ -1032,7 +1070,26 @@ function completeAsyncIteratorValue(
10321070
return;
10331071
}
10341072

1035-
next(index + 1, completedResults);
1073+
const newIndex = index + 1;
1074+
if (
1075+
stream &&
1076+
typeof stream.initialCount === 'number' &&
1077+
newIndex >= stream.initialCount
1078+
) {
1079+
exeContext.dispatcher.addAsyncIteratorValue(
1080+
stream.label,
1081+
newIndex,
1082+
path,
1083+
iterator,
1084+
exeContext,
1085+
fieldNodes,
1086+
info,
1087+
itemType,
1088+
);
1089+
resolve(completedResults);
1090+
return;
1091+
}
1092+
next(newIndex, completedResults);
10361093
},
10371094
(rawError) => {
10381095
completedResults.push(null);
@@ -1087,6 +1144,8 @@ function completeListValue(
10871144
);
10881145
}
10891146

1147+
const stream = getStreamValues(exeContext, fieldNodes);
1148+
10901149
// This is specified as a simple map, however we're optimizing the path
10911150
// where the list contains no Promises by avoiding creating another Promise.
10921151
let containsPromise = false;
@@ -1096,6 +1155,23 @@ function completeListValue(
10961155
const itemPath = addPath(path, index, undefined);
10971156
try {
10981157
let completedItem;
1158+
1159+
if (
1160+
stream &&
1161+
typeof stream.initialCount === 'number' &&
1162+
index >= stream.initialCount
1163+
) {
1164+
exeContext.dispatcher.addValue(
1165+
stream.label,
1166+
itemPath,
1167+
item,
1168+
exeContext,
1169+
fieldNodes,
1170+
info,
1171+
itemType,
1172+
);
1173+
return;
1174+
}
10991175
if (isPromise(item)) {
11001176
completedItem = item.then((resolved) =>
11011177
completeValue(
@@ -1138,7 +1214,7 @@ function completeListValue(
11381214
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
11391215
return handleFieldError(error, itemType, errors);
11401216
}
1141-
});
1217+
}).filter((val) => val !== undefined);
11421218

11431219
return containsPromise ? Promise.all(completedResults) : completedResults;
11441220
}
@@ -1554,6 +1630,129 @@ export class Dispatcher {
15541630
);
15551631
}
15561632

1633+
addValue(
1634+
label?: string,
1635+
path: Path,
1636+
promiseOrData: PromiseOrValue<ObjMap<mixed> | mixed>,
1637+
exeContext: ExecutionContext,
1638+
fieldNodes: $ReadOnlyArray<FieldNode>,
1639+
info: GraphQLResolveInfo,
1640+
itemType: GraphQLOutputType,
1641+
): void {
1642+
const errors = [];
1643+
this._subsequentPayloads.push(
1644+
Promise.resolve(promiseOrData)
1645+
.then((resolved) =>
1646+
completeValue(
1647+
exeContext,
1648+
itemType,
1649+
fieldNodes,
1650+
info,
1651+
path,
1652+
resolved,
1653+
errors,
1654+
),
1655+
)
1656+
// Note: we don't rely on a `catch` method, but we do expect "thenable"
1657+
// to take a second callback for the error case.
1658+
.then(undefined, (rawError) => {
1659+
const error = locatedError(rawError, fieldNodes, pathToArray(path));
1660+
return handleFieldError(error, itemType, errors);
1661+
})
1662+
.then((data) => ({
1663+
value: createPatchResult(data, label, path, errors),
1664+
done: false,
1665+
})),
1666+
);
1667+
}
1668+
1669+
addAsyncIteratorValue(
1670+
label?: string,
1671+
initialIndex: number,
1672+
path?: Path,
1673+
iterator: AsyncIterator<mixed>,
1674+
exeContext: ExecutionContext,
1675+
fieldNodes: $ReadOnlyArray<FieldNode>,
1676+
info: GraphQLResolveInfo,
1677+
itemType: GraphQLOutputType,
1678+
): void {
1679+
const subsequentPayloads = this._subsequentPayloads;
1680+
function next(index) {
1681+
const fieldPath = addPath(path, index);
1682+
const patchErrors = [];
1683+
subsequentPayloads.push(
1684+
iterator.next().then(
1685+
({ value: data, done }) => {
1686+
if (done) {
1687+
return { value: undefined, done: true };
1688+
}
1689+
1690+
// eslint-disable-next-line node/callback-return
1691+
next(index + 1);
1692+
1693+
try {
1694+
const completedItem = completeValue(
1695+
exeContext,
1696+
itemType,
1697+
fieldNodes,
1698+
info,
1699+
fieldPath,
1700+
data,
1701+
patchErrors,
1702+
);
1703+
1704+
if (isPromise(completedItem)) {
1705+
return completedItem.then((resolveItem) => ({
1706+
value: createPatchResult(
1707+
resolveItem,
1708+
label,
1709+
fieldPath,
1710+
patchErrors,
1711+
),
1712+
done: false,
1713+
}));
1714+
}
1715+
1716+
return {
1717+
value: createPatchResult(
1718+
completedItem,
1719+
label,
1720+
fieldPath,
1721+
patchErrors,
1722+
),
1723+
done: false,
1724+
};
1725+
} catch (rawError) {
1726+
const error = locatedError(
1727+
rawError,
1728+
fieldNodes,
1729+
pathToArray(fieldPath),
1730+
);
1731+
handleFieldError(error, itemType, patchErrors);
1732+
return {
1733+
value: createPatchResult(null, label, fieldPath, patchErrors),
1734+
done: false,
1735+
};
1736+
}
1737+
},
1738+
(rawError) => {
1739+
const error = locatedError(
1740+
rawError,
1741+
fieldNodes,
1742+
pathToArray(fieldPath),
1743+
);
1744+
handleFieldError(error, itemType, patchErrors);
1745+
return {
1746+
value: createPatchResult(null, label, fieldPath, patchErrors),
1747+
done: false,
1748+
};
1749+
},
1750+
),
1751+
);
1752+
}
1753+
next(initialIndex);
1754+
}
1755+
15571756
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
15581757
return new Promise((resolve) => {
15591758
this._subsequentPayloads.forEach((promise) => {
@@ -1570,7 +1769,20 @@ export class Dispatcher {
15701769
);
15711770
return promise;
15721771
})
1573-
.then(({ value }) => {
1772+
.then(({ value, done }) => {
1773+
if (done && this._subsequentPayloads.length === 0) {
1774+
// async iterable resolver just finished and no more pending payloads
1775+
return {
1776+
value: {
1777+
hasNext: false,
1778+
},
1779+
done: false,
1780+
};
1781+
} else if (done) {
1782+
// async iterable resolver just finished but there are pending payloads
1783+
// return the next one
1784+
return this._race();
1785+
}
15741786
const returnValue: ExecutionPatchResult = {
15751787
...value,
15761788
hasNext: this._subsequentPayloads.length > 0,

0 commit comments

Comments
 (0)