Skip to content

Commit 018a570

Browse files
committed
Implement support for @stream directive
# Conflicts: # src/execution/execute.ts # src/validation/index.d.ts # src/validation/index.ts
1 parent 0100e57 commit 018a570

File tree

8 files changed

+1234
-18
lines changed

8 files changed

+1234
-18
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 700 additions & 0 deletions
Large diffs are not rendered by default.

src/execution/execute.ts

Lines changed: 243 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import {
4545
TypeMetaFieldDef,
4646
TypeNameMetaFieldDef,
4747
} from '../type/introspection';
48+
import { GraphQLStreamDirective } from '../type/directives';
4849
import {
4950
isObjectType,
5051
isAbstractType,
@@ -53,7 +54,11 @@ import {
5354
isNonNullType,
5455
} from '../type/definition';
5556

56-
import { getVariableValues, getArgumentValues } from './values';
57+
import {
58+
getVariableValues,
59+
getArgumentValues,
60+
getDirectiveValues,
61+
} from './values';
5762
import {
5863
collectFields,
5964
collectSubfields as _collectSubfields,
@@ -158,7 +163,7 @@ export interface FormattedExecutionResult<
158163
* - `extensions` is reserved for adding non-standard properties.
159164
*/
160165
export interface ExecutionPatchResult<
161-
TData = ObjMap<unknown>,
166+
TData = ObjMap<unknown> | unknown,
162167
TExtensions = ObjMap<unknown>,
163168
> {
164169
errors?: ReadonlyArray<GraphQLError>;
@@ -170,7 +175,7 @@ export interface ExecutionPatchResult<
170175
}
171176

172177
export interface FormattedExecutionPatchResult<
173-
TData = ObjMap<unknown>,
178+
TData = ObjMap<unknown> | unknown,
174179
TExtensions = ObjMap<unknown>,
175180
> {
176181
errors?: ReadonlyArray<GraphQLFormattedError>;
@@ -804,6 +809,44 @@ function completeValue(
804809
);
805810
}
806811

812+
/**
813+
* Returns an object containing the `@stream` arguments if a field should be
814+
* streamed based on the experimental flag, stream directive present and
815+
* not disabled by the "if" argument.
816+
*/
817+
function getStreamValues(
818+
exeContext: ExecutionContext,
819+
fieldNodes: ReadonlyArray<FieldNode>,
820+
):
821+
| undefined
822+
| {
823+
initialCount?: number;
824+
label?: string;
825+
} {
826+
// validation only allows equivalent streams on multiple fields, so it is
827+
// safe to only check the first fieldNode for the stream directive
828+
const stream = getDirectiveValues(
829+
GraphQLStreamDirective,
830+
fieldNodes[0],
831+
exeContext.variableValues,
832+
);
833+
834+
if (!stream) {
835+
return;
836+
}
837+
838+
if (stream.if === false) {
839+
return;
840+
}
841+
842+
return {
843+
initialCount:
844+
// istanbul ignore next (initialCount is required number argument)
845+
typeof stream.initialCount === 'number' ? stream.initialCount : undefined,
846+
label: typeof stream.label === 'string' ? stream.label : undefined,
847+
};
848+
}
849+
807850
/**
808851
* Complete a async iterator value by completing the result and calling
809852
* recursively until all the results are completed.
@@ -818,8 +861,28 @@ function completeAsyncIteratorValue(
818861
errors: Array<GraphQLError>,
819862
): Promise<ReadonlyArray<unknown>> {
820863
let containsPromise = false;
864+
const stream = getStreamValues(exeContext, fieldNodes);
821865
return new Promise<ReadonlyArray<unknown>>((resolve) => {
822866
function next(index: number, completedResults: Array<unknown>) {
867+
if (
868+
stream &&
869+
typeof stream.initialCount === 'number' &&
870+
index >= stream.initialCount
871+
) {
872+
exeContext.dispatcher.addAsyncIteratorValue(
873+
index,
874+
iterator,
875+
exeContext,
876+
fieldNodes,
877+
info,
878+
itemType,
879+
path,
880+
stream.label,
881+
);
882+
resolve(completedResults);
883+
return;
884+
}
885+
823886
const fieldPath = addPath(path, index, undefined);
824887
iterator.next().then(
825888
({ value, done }) => {
@@ -908,15 +971,37 @@ function completeListValue(
908971
);
909972
}
910973

974+
const stream = getStreamValues(exeContext, fieldNodes);
975+
911976
// This is specified as a simple map, however we're optimizing the path
912977
// where the list contains no Promises by avoiding creating another Promise.
913978
let containsPromise = false;
914-
const completedResults = Array.from(result, (item, index) => {
979+
const completedResults = [];
980+
let index = 0;
981+
for (const item of result) {
915982
// No need to modify the info object containing the path,
916983
// since from here on it is not ever accessed by resolver functions.
917984
const itemPath = addPath(path, index, undefined);
918985
try {
919986
let completedItem;
987+
988+
if (
989+
stream &&
990+
typeof stream.initialCount === 'number' &&
991+
index >= stream.initialCount
992+
) {
993+
exeContext.dispatcher.addValue(
994+
itemPath,
995+
item,
996+
exeContext,
997+
fieldNodes,
998+
info,
999+
itemType,
1000+
stream.label,
1001+
);
1002+
index++;
1003+
continue;
1004+
}
9201005
if (isPromise(item)) {
9211006
completedItem = item.then((resolved) =>
9221007
completeValue(
@@ -945,21 +1030,25 @@ function completeListValue(
9451030
containsPromise = true;
9461031
// Note: we don't rely on a `catch` method, but we do expect "thenable"
9471032
// to take a second callback for the error case.
948-
return completedItem.then(undefined, (rawError) => {
949-
const error = locatedError(
950-
rawError,
951-
fieldNodes,
952-
pathToArray(itemPath),
953-
);
954-
return handleFieldError(error, itemType, errors);
955-
});
1033+
completedResults.push(
1034+
completedItem.then(undefined, (rawError) => {
1035+
const error = locatedError(
1036+
rawError,
1037+
fieldNodes,
1038+
pathToArray(itemPath),
1039+
);
1040+
return handleFieldError(error, itemType, errors);
1041+
}),
1042+
);
1043+
} else {
1044+
completedResults.push(completedItem);
9561045
}
957-
return completedItem;
9581046
} catch (rawError) {
9591047
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
960-
return handleFieldError(error, itemType, errors);
1048+
completedResults.push(handleFieldError(error, itemType, errors));
9611049
}
962-
});
1050+
index++;
1051+
}
9631052

9641053
return containsPromise ? Promise.all(completedResults) : completedResults;
9651054
}
@@ -1301,7 +1390,7 @@ export function getFieldDef(
13011390
*/
13021391
interface DispatcherResult {
13031392
errors?: ReadonlyArray<GraphQLError>;
1304-
data?: ObjMap<unknown> | null;
1393+
data?: ObjMap<unknown> | unknown | null;
13051394
path: ReadonlyArray<string | number>;
13061395
label?: string;
13071396
extensions?: ObjMap<unknown>;
@@ -1341,6 +1430,129 @@ export class Dispatcher {
13411430
);
13421431
}
13431432

1433+
addValue(
1434+
path: Path,
1435+
promiseOrData: PromiseOrValue<unknown>,
1436+
exeContext: ExecutionContext,
1437+
fieldNodes: ReadonlyArray<FieldNode>,
1438+
info: GraphQLResolveInfo,
1439+
itemType: GraphQLOutputType,
1440+
label?: string,
1441+
): void {
1442+
const errors: Array<GraphQLError> = [];
1443+
this._subsequentPayloads.push(
1444+
Promise.resolve(promiseOrData)
1445+
.then((resolved) =>
1446+
completeValue(
1447+
exeContext,
1448+
itemType,
1449+
fieldNodes,
1450+
info,
1451+
path,
1452+
resolved,
1453+
errors,
1454+
),
1455+
)
1456+
// Note: we don't rely on a `catch` method, but we do expect "thenable"
1457+
// to take a second callback for the error case.
1458+
.then(undefined, (rawError) => {
1459+
const error = locatedError(rawError, fieldNodes, pathToArray(path));
1460+
return handleFieldError(error, itemType, errors);
1461+
})
1462+
.then((data) => ({
1463+
value: createPatchResult(data, label, path, errors),
1464+
done: false,
1465+
})),
1466+
);
1467+
}
1468+
1469+
addAsyncIteratorValue(
1470+
initialIndex: number,
1471+
iterator: AsyncIterator<unknown>,
1472+
exeContext: ExecutionContext,
1473+
fieldNodes: ReadonlyArray<FieldNode>,
1474+
info: GraphQLResolveInfo,
1475+
itemType: GraphQLOutputType,
1476+
path?: Path,
1477+
label?: string,
1478+
): void {
1479+
const subsequentPayloads = this._subsequentPayloads;
1480+
function next(index: number) {
1481+
const fieldPath = addPath(path, index, undefined);
1482+
const patchErrors: Array<GraphQLError> = [];
1483+
subsequentPayloads.push(
1484+
iterator.next().then(
1485+
({ value: data, done }) => {
1486+
if (done) {
1487+
return { value: undefined, done: true };
1488+
}
1489+
1490+
// eslint-disable-next-line node/callback-return
1491+
next(index + 1);
1492+
1493+
try {
1494+
const completedItem = completeValue(
1495+
exeContext,
1496+
itemType,
1497+
fieldNodes,
1498+
info,
1499+
fieldPath,
1500+
data,
1501+
patchErrors,
1502+
);
1503+
1504+
if (isPromise(completedItem)) {
1505+
return completedItem.then((resolveItem) => ({
1506+
value: createPatchResult(
1507+
resolveItem,
1508+
label,
1509+
fieldPath,
1510+
patchErrors,
1511+
),
1512+
done: false,
1513+
}));
1514+
}
1515+
1516+
return {
1517+
value: createPatchResult(
1518+
completedItem,
1519+
label,
1520+
fieldPath,
1521+
patchErrors,
1522+
),
1523+
done: false,
1524+
};
1525+
} catch (rawError) {
1526+
const error = locatedError(
1527+
rawError,
1528+
fieldNodes,
1529+
pathToArray(fieldPath),
1530+
);
1531+
handleFieldError(error, itemType, patchErrors);
1532+
return {
1533+
value: createPatchResult(null, label, fieldPath, patchErrors),
1534+
done: false,
1535+
};
1536+
}
1537+
},
1538+
(rawError) => {
1539+
const error = locatedError(
1540+
rawError,
1541+
fieldNodes,
1542+
pathToArray(fieldPath),
1543+
);
1544+
handleFieldError(error, itemType, patchErrors);
1545+
return {
1546+
value: createPatchResult(null, label, fieldPath, patchErrors),
1547+
done: false,
1548+
};
1549+
},
1550+
),
1551+
);
1552+
}
1553+
next(initialIndex);
1554+
}
1555+
13441556
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
13451557
return new Promise<{
13461558
promise: Promise<IteratorResult<DispatcherResult, void>>;
@@ -1360,7 +1572,20 @@ export class Dispatcher {
13601572
);
13611573
return promise;
13621574
})
1363-
.then(({ value }) => {
1575+
.then(({ value, done }) => {
1576+
if (done && this._subsequentPayloads.length === 0) {
1577+
// async iterable resolver just finished and no more pending payloads
1578+
return {
1579+
value: {
1580+
hasNext: false,
1581+
},
1582+
done: false,
1583+
};
1584+
} else if (done) {
1585+
// async iterable resolver just finished but there are pending payloads
1586+
// return the next one
1587+
return this._race();
1588+
}
13641589
const returnValue: ExecutionPatchResult = {
13651590
...value,
13661591
hasNext: this._subsequentPayloads.length > 0,
@@ -1402,7 +1627,7 @@ export class Dispatcher {
14021627
}
14031628

14041629
function createPatchResult(
1405-
data: ObjMap<unknown> | null,
1630+
data: ObjMap<unknown> | unknown | null,
14061631
label?: string,
14071632
path?: Path,
14081633
errors?: ReadonlyArray<GraphQLError>,

0 commit comments

Comments
 (0)