Skip to content

Commit c415b05

Browse files
committed
Implement support for @stream directive
# Conflicts: # src/execution/execute.ts # src/validation/index.d.ts # src/validation/index.ts
1 parent 12b6c74 commit c415b05

File tree

8 files changed

+1234
-18
lines changed

8 files changed

+1234
-18
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 700 additions & 0 deletions
Large diffs are not rendered by default.

src/execution/execute.ts

Lines changed: 243 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import {
4444
TypeMetaFieldDef,
4545
TypeNameMetaFieldDef,
4646
} from '../type/introspection';
47+
import { GraphQLStreamDirective } from '../type/directives';
4748
import {
4849
isObjectType,
4950
isAbstractType,
@@ -54,7 +55,11 @@ import {
5455

5556
import { getOperationRootType } from '../utilities/getOperationRootType';
5657

57-
import { getVariableValues, getArgumentValues } from './values';
58+
import {
59+
getVariableValues,
60+
getArgumentValues,
61+
getDirectiveValues,
62+
} from './values';
5863
import type { FieldsAndPatches, PatchFields } from './collectFields';
5964
import { collectFields } from './collectFields';
6065

@@ -136,7 +141,7 @@ export interface FormattedExecutionResult<
136141
* - `extensions` is reserved for adding non-standard properties.
137142
*/
138143
export interface ExecutionPatchResult<
139-
TData = ObjMap<unknown>,
144+
TData = ObjMap<unknown> | unknown,
140145
TExtensions = ObjMap<unknown>,
141146
> {
142147
errors?: ReadonlyArray<GraphQLError>;
@@ -148,7 +153,7 @@ export interface ExecutionPatchResult<
148153
}
149154

150155
export interface FormattedExecutionPatchResult<
151-
TData = ObjMap<unknown>,
156+
TData = ObjMap<unknown> | unknown,
152157
TExtensions = ObjMap<unknown>,
153158
> {
154159
errors?: ReadonlyArray<GraphQLFormattedError>;
@@ -762,6 +767,44 @@ function completeValue(
762767
);
763768
}
764769

770+
/**
771+
* Returns an object containing the `@stream` arguments if a field should be
772+
* streamed based on the experimental flag, stream directive present and
773+
* not disabled by the "if" argument.
774+
*/
775+
function getStreamValues(
776+
exeContext: ExecutionContext,
777+
fieldNodes: ReadonlyArray<FieldNode>,
778+
):
779+
| undefined
780+
| {
781+
initialCount?: number;
782+
label?: string;
783+
} {
784+
// validation only allows equivalent streams on multiple fields, so it is
785+
// safe to only check the first fieldNode for the stream directive
786+
const stream = getDirectiveValues(
787+
GraphQLStreamDirective,
788+
fieldNodes[0],
789+
exeContext.variableValues,
790+
);
791+
792+
if (!stream) {
793+
return;
794+
}
795+
796+
if (stream.if === false) {
797+
return;
798+
}
799+
800+
return {
801+
initialCount:
802+
// istanbul ignore next (initialCount is required number argument)
803+
typeof stream.initialCount === 'number' ? stream.initialCount : undefined,
804+
label: typeof stream.label === 'string' ? stream.label : undefined,
805+
};
806+
}
807+
765808
/**
766809
* Complete a async iterator value by completing the result and calling
767810
* recursively until all the results are completed.
@@ -776,8 +819,28 @@ function completeAsyncIteratorValue(
776819
errors: Array<GraphQLError>,
777820
): Promise<ReadonlyArray<unknown>> {
778821
let containsPromise = false;
822+
const stream = getStreamValues(exeContext, fieldNodes);
779823
return new Promise<ReadonlyArray<unknown>>((resolve) => {
780824
function next(index: number, completedResults: Array<unknown>) {
825+
if (
826+
stream &&
827+
typeof stream.initialCount === 'number' &&
828+
index >= stream.initialCount
829+
) {
830+
exeContext.dispatcher.addAsyncIteratorValue(
831+
index,
832+
iterator,
833+
exeContext,
834+
fieldNodes,
835+
info,
836+
itemType,
837+
path,
838+
stream.label,
839+
);
840+
resolve(completedResults);
841+
return;
842+
}
843+
781844
const fieldPath = addPath(path, index, undefined);
782845
iterator.next().then(
783846
({ value, done }) => {
@@ -866,15 +929,37 @@ function completeListValue(
866929
);
867930
}
868931

932+
const stream = getStreamValues(exeContext, fieldNodes);
933+
869934
// This is specified as a simple map, however we're optimizing the path
870935
// where the list contains no Promises by avoiding creating another Promise.
871936
let containsPromise = false;
872-
const completedResults = Array.from(result, (item, index) => {
937+
const completedResults = [];
938+
let index = 0;
939+
for (const item of result) {
873940
// No need to modify the info object containing the path,
874941
// since from here on it is not ever accessed by resolver functions.
875942
const itemPath = addPath(path, index, undefined);
876943
try {
877944
let completedItem;
945+
946+
if (
947+
stream &&
948+
typeof stream.initialCount === 'number' &&
949+
index >= stream.initialCount
950+
) {
951+
exeContext.dispatcher.addValue(
952+
itemPath,
953+
item,
954+
exeContext,
955+
fieldNodes,
956+
info,
957+
itemType,
958+
stream.label,
959+
);
960+
index++;
961+
continue;
962+
}
878963
if (isPromise(item)) {
879964
completedItem = item.then((resolved) =>
880965
completeValue(
@@ -903,21 +988,25 @@ function completeListValue(
903988
containsPromise = true;
904989
// Note: we don't rely on a `catch` method, but we do expect "thenable"
905990
// to take a second callback for the error case.
906-
return completedItem.then(undefined, (rawError) => {
907-
const error = locatedError(
908-
rawError,
909-
fieldNodes,
910-
pathToArray(itemPath),
911-
);
912-
return handleFieldError(error, itemType, errors);
913-
});
991+
completedResults.push(
992+
completedItem.then(undefined, (rawError) => {
993+
const error = locatedError(
994+
rawError,
995+
fieldNodes,
996+
pathToArray(itemPath),
997+
);
998+
return handleFieldError(error, itemType, errors);
999+
}),
1000+
);
1001+
} else {
1002+
completedResults.push(completedItem);
9141003
}
915-
return completedItem;
9161004
} catch (rawError) {
9171005
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
918-
return handleFieldError(error, itemType, errors);
1006+
completedResults.push(handleFieldError(error, itemType, errors));
9191007
}
920-
});
1008+
index++;
1009+
}
9211010

9221011
return containsPromise ? Promise.all(completedResults) : completedResults;
9231012
}
@@ -1295,7 +1384,7 @@ export function getFieldDef(
12951384
*/
12961385
interface DispatcherResult {
12971386
errors?: ReadonlyArray<GraphQLError>;
1298-
data?: ObjMap<unknown> | null;
1387+
data?: ObjMap<unknown> | unknown | null;
12991388
path: ReadonlyArray<string | number>;
13001389
label?: string;
13011390
extensions?: ObjMap<unknown>;
@@ -1334,6 +1423,129 @@ export class Dispatcher {
13341423
);
13351424
}
13361425

1426+
addValue(
1427+
path: Path,
1428+
promiseOrData: PromiseOrValue<unknown>,
1429+
exeContext: ExecutionContext,
1430+
fieldNodes: ReadonlyArray<FieldNode>,
1431+
info: GraphQLResolveInfo,
1432+
itemType: GraphQLOutputType,
1433+
label?: string,
1434+
): void {
1435+
const errors: Array<GraphQLError> = [];
1436+
this._subsequentPayloads.push(
1437+
Promise.resolve(promiseOrData)
1438+
.then((resolved) =>
1439+
completeValue(
1440+
exeContext,
1441+
itemType,
1442+
fieldNodes,
1443+
info,
1444+
path,
1445+
resolved,
1446+
errors,
1447+
),
1448+
)
1449+
// Note: we don't rely on a `catch` method, but we do expect "thenable"
1450+
// to take a second callback for the error case.
1451+
.then(undefined, (rawError) => {
1452+
const error = locatedError(rawError, fieldNodes, pathToArray(path));
1453+
return handleFieldError(error, itemType, errors);
1454+
})
1455+
.then((data) => ({
1456+
value: createPatchResult(data, label, path, errors),
1457+
done: false,
1458+
})),
1459+
);
1460+
}
1461+
1462+
addAsyncIteratorValue(
1463+
initialIndex: number,
1464+
iterator: AsyncIterator<unknown>,
1465+
exeContext: ExecutionContext,
1466+
fieldNodes: ReadonlyArray<FieldNode>,
1467+
info: GraphQLResolveInfo,
1468+
itemType: GraphQLOutputType,
1469+
path?: Path,
1470+
label?: string,
1471+
): void {
1472+
const subsequentPayloads = this._subsequentPayloads;
1473+
function next(index: number) {
1474+
const fieldPath = addPath(path, index, undefined);
1475+
const patchErrors: Array<GraphQLError> = [];
1476+
subsequentPayloads.push(
1477+
iterator.next().then(
1478+
({ value: data, done }) => {
1479+
if (done) {
1480+
return { value: undefined, done: true };
1481+
}
1482+
1483+
// eslint-disable-next-line node/callback-return
1484+
next(index + 1);
1485+
1486+
try {
1487+
const completedItem = completeValue(
1488+
exeContext,
1489+
itemType,
1490+
fieldNodes,
1491+
info,
1492+
fieldPath,
1493+
data,
1494+
patchErrors,
1495+
);
1496+
1497+
if (isPromise(completedItem)) {
1498+
return completedItem.then((resolveItem) => ({
1499+
value: createPatchResult(
1500+
resolveItem,
1501+
label,
1502+
fieldPath,
1503+
patchErrors,
1504+
),
1505+
done: false,
1506+
}));
1507+
}
1508+
1509+
return {
1510+
value: createPatchResult(
1511+
completedItem,
1512+
label,
1513+
fieldPath,
1514+
patchErrors,
1515+
),
1516+
done: false,
1517+
};
1518+
} catch (rawError) {
1519+
const error = locatedError(
1520+
rawError,
1521+
fieldNodes,
1522+
pathToArray(fieldPath),
1523+
);
1524+
handleFieldError(error, itemType, patchErrors);
1525+
return {
1526+
value: createPatchResult(null, label, fieldPath, patchErrors),
1527+
done: false,
1528+
};
1529+
}
1530+
},
1531+
(rawError) => {
1532+
const error = locatedError(
1533+
rawError,
1534+
fieldNodes,
1535+
pathToArray(fieldPath),
1536+
);
1537+
handleFieldError(error, itemType, patchErrors);
1538+
return {
1539+
value: createPatchResult(null, label, fieldPath, patchErrors),
1540+
done: false,
1541+
};
1542+
},
1543+
),
1544+
);
1545+
}
1546+
next(initialIndex);
1547+
}
1548+
13371549
_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
13381550
return new Promise<{
13391551
promise: Promise<IteratorResult<DispatcherResult, void>>;
@@ -1353,7 +1565,20 @@ export class Dispatcher {
13531565
);
13541566
return promise;
13551567
})
1356-
.then(({ value }) => {
1568+
.then(({ value, done }) => {
1569+
if (done && this._subsequentPayloads.length === 0) {
1570+
// async iterable resolver just finished and no more pending payloads
1571+
return {
1572+
value: {
1573+
hasNext: false,
1574+
},
1575+
done: false,
1576+
};
1577+
} else if (done) {
1578+
// async iterable resolver just finished but there are pending payloads
1579+
// return the next one
1580+
return this._race();
1581+
}
13571582
const returnValue: ExecutionPatchResult = {
13581583
...value,
13591584
hasNext: this._subsequentPayloads.length > 0,
@@ -1399,7 +1624,7 @@ export class Dispatcher {
13991624
}
14001625

14011626
function createPatchResult(
1402-
data: ObjMap<unknown> | null,
1627+
data: ObjMap<unknown> | unknown | null,
14031628
label?: string,
14041629
path?: Path,
14051630
errors?: ReadonlyArray<GraphQLError>,

0 commit comments

Comments
 (0)