Skip to content

Commit 4c262a2

Browse files
committed
add defer/stream support for subscriptions (#7)
1 parent 2ebfec2 commit 4c262a2

File tree

4 files changed

+339
-14
lines changed

4 files changed

+339
-14
lines changed
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import { expect } from 'chai';
2+
import { describe, it } from 'mocha';
3+
4+
import { flattenAsyncIterator } from '../flattenAsyncIterator';
5+
6+
describe('flattenAsyncIterator', () => {
7+
it('does not modify an already flat async generator', async () => {
8+
async function* source() {
9+
yield 1;
10+
yield 2;
11+
yield 3;
12+
}
13+
14+
const result = flattenAsyncIterator(source());
15+
16+
expect(await result.next()).to.deep.equal({ value: 1, done: false });
17+
expect(await result.next()).to.deep.equal({ value: 2, done: false });
18+
expect(await result.next()).to.deep.equal({ value: 3, done: false });
19+
expect(await result.next()).to.deep.equal({
20+
value: undefined,
21+
done: true,
22+
});
23+
});
24+
25+
it('does not modify an already flat async iterator', async () => {
26+
const items = [1, 2, 3];
27+
28+
const iterator: any = {
29+
[Symbol.asyncIterator]() {
30+
return this;
31+
},
32+
next() {
33+
return Promise.resolve({
34+
done: items.length === 0,
35+
value: items.shift(),
36+
});
37+
},
38+
};
39+
40+
const result = flattenAsyncIterator(iterator);
41+
42+
expect(await result.next()).to.deep.equal({ value: 1, done: false });
43+
expect(await result.next()).to.deep.equal({ value: 2, done: false });
44+
expect(await result.next()).to.deep.equal({ value: 3, done: false });
45+
expect(await result.next()).to.deep.equal({
46+
value: undefined,
47+
done: true,
48+
});
49+
});
50+
51+
it('flatten nested async generators', async () => {
52+
async function* source() {
53+
yield 1;
54+
yield 2;
55+
yield (async function* (): AsyncGenerator<number, void, void> {
56+
yield 2.1;
57+
yield 2.2;
58+
})();
59+
yield 3;
60+
}
61+
62+
const doubles = flattenAsyncIterator(source());
63+
64+
const result = [];
65+
for await (const x of doubles) {
66+
result.push(x);
67+
}
68+
expect(result).to.deep.equal([1, 2, 2.1, 2.2, 3]);
69+
});
70+
71+
it('allows returning early from a nested async generator', async () => {
72+
async function* source() {
73+
yield 1;
74+
yield 2;
75+
yield (async function* (): AsyncGenerator<number, void, void> {
76+
yield 2.1;
77+
// istanbul ignore next (Shouldn't be reached)
78+
yield 2.2;
79+
})();
80+
// istanbul ignore next (Shouldn't be reached)
81+
yield 3;
82+
}
83+
84+
const doubles = flattenAsyncIterator(source());
85+
86+
expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
87+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
88+
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });
89+
90+
// Early return
91+
expect(await doubles.return()).to.deep.equal({
92+
value: undefined,
93+
done: true,
94+
});
95+
96+
// Subsequent next calls
97+
expect(await doubles.next()).to.deep.equal({
98+
value: undefined,
99+
done: true,
100+
});
101+
expect(await doubles.next()).to.deep.equal({
102+
value: undefined,
103+
done: true,
104+
});
105+
});
106+
107+
it('allows throwing errors from a nested async generator', async () => {
108+
async function* source() {
109+
yield 1;
110+
yield 2;
111+
yield (async function* (): AsyncGenerator<number, void, void> {
112+
yield 2.1;
113+
// istanbul ignore next (Shouldn't be reached)
114+
yield 2.2;
115+
})();
116+
// istanbul ignore next (Shouldn't be reached)
117+
yield 3;
118+
}
119+
120+
const doubles = flattenAsyncIterator(source());
121+
122+
expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
123+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
124+
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });
125+
126+
// Throw error
127+
let caughtError;
128+
try {
129+
await doubles.throw('ouch');
130+
} catch (e) {
131+
caughtError = e;
132+
}
133+
expect(caughtError).to.equal('ouch');
134+
});
135+
});

src/subscription/__tests__/subscribe-test.js

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,153 @@ describe('Subscription Publish Phase', () => {
648648
});
649649
});
650650

651+
it('produces additional payloads for subscriptions with @defer', async () => {
652+
const pubsub = new SimplePubSub();
653+
const subscription = await createSubscription(
654+
pubsub,
655+
emailSchema,
656+
parse(`
657+
subscription ($priority: Int = 0) {
658+
importantEmail(priority: $priority) {
659+
email {
660+
from
661+
subject
662+
}
663+
... @defer {
664+
inbox {
665+
unread
666+
total
667+
}
668+
}
669+
}
670+
}
671+
`),
672+
);
673+
invariant(isAsyncIterable(subscription));
674+
// Wait for the next subscription payload.
675+
const payload = subscription.next();
676+
677+
// A new email arrives!
678+
expect(
679+
pubsub.emit({
680+
from: 'yuzhi@graphql.org',
681+
subject: 'Alright',
682+
message: 'Tests are good',
683+
unread: true,
684+
}),
685+
).to.equal(true);
686+
687+
// The previously waited on payload now has a value.
688+
expect(await payload).to.deep.equal({
689+
done: false,
690+
value: {
691+
data: {
692+
importantEmail: {
693+
email: {
694+
from: 'yuzhi@graphql.org',
695+
subject: 'Alright',
696+
},
697+
},
698+
},
699+
hasNext: true,
700+
},
701+
});
702+
703+
// Wait for the next payload from @defer
704+
expect(await subscription.next()).to.deep.equal({
705+
done: false,
706+
value: {
707+
data: {
708+
inbox: {
709+
unread: 1,
710+
total: 2,
711+
},
712+
},
713+
path: ['importantEmail'],
714+
hasNext: false,
715+
},
716+
});
717+
718+
// Another new email arrives, after all incrementally delivered payloads are received.
719+
expect(
720+
pubsub.emit({
721+
from: 'hyo@graphql.org',
722+
subject: 'Tools',
723+
message: 'I <3 making things',
724+
unread: true,
725+
}),
726+
).to.equal(true);
727+
728+
// The next waited on payload will have a value.
729+
expect(await subscription.next()).to.deep.equal({
730+
done: false,
731+
value: {
732+
data: {
733+
importantEmail: {
734+
email: {
735+
from: 'hyo@graphql.org',
736+
subject: 'Tools',
737+
},
738+
},
739+
},
740+
hasNext: true,
741+
},
742+
});
743+
744+
// Another new email arrives, before the incrementally delivered payloads from the last email was received.
745+
expect(
746+
pubsub.emit({
747+
from: 'adam@graphql.org',
748+
subject: 'Important',
749+
message: 'Read me please',
750+
unread: true,
751+
}),
752+
).to.equal(true);
753+
754+
// Deferred payload from previous event is received.
755+
expect(await subscription.next()).to.deep.equal({
756+
done: false,
757+
value: {
758+
data: {
759+
inbox: {
760+
unread: 2,
761+
total: 3,
762+
},
763+
},
764+
path: ['importantEmail'],
765+
hasNext: false,
766+
},
767+
});
768+
769+
// Next payload from last event
770+
expect(await subscription.next()).to.deep.equal({
771+
done: false,
772+
value: {
773+
data: {
774+
importantEmail: {
775+
email: {
776+
from: 'adam@graphql.org',
777+
subject: 'Important',
778+
},
779+
},
780+
},
781+
hasNext: true,
782+
},
783+
});
784+
785+
// The client disconnects before the deferred payload is consumed.
786+
expect(await subscription.return()).to.deep.equal({
787+
done: true,
788+
value: undefined,
789+
});
790+
791+
// Awaiting a subscription after closing it results in completed results.
792+
expect(await subscription.next()).to.deep.equal({
793+
done: true,
794+
value: undefined,
795+
});
796+
});
797+
651798
it('produces a payload when there are multiple events', async () => {
652799
const pubsub = new SimplePubSub();
653800
const subscription = await createSubscription(pubsub);
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { isAsyncIterable } from '../jsutils/isAsyncIterable';
2+
3+
/**
4+
* Given an AsyncIterable that could potentially yield other async iterators,
5+
* flatten all yielded results into a single AsyncIterable
6+
*/
7+
export function flattenAsyncIterator<T>(
8+
iterable: AsyncGenerator<AsyncGenerator<T, void, void> | T, void, void>,
9+
): AsyncGenerator<T, void, void> {
10+
// $FlowFixMe[prop-missing]
11+
const iteratorMethod = iterable[Symbol.asyncIterator];
12+
const iterator: any = iteratorMethod.call(iterable);
13+
let iteratorStack: Array<AsyncGenerator<T, void, void>> = [iterator];
14+
15+
function next(): Promise<IteratorResult<T, void>> {
16+
const currentIterator = iteratorStack[0];
17+
if (!currentIterator) {
18+
return Promise.resolve({ value: undefined, done: true });
19+
}
20+
return currentIterator.next().then((result) => {
21+
if (result.done) {
22+
iteratorStack.shift();
23+
return next();
24+
} else if (isAsyncIterable(result.value)) {
25+
const childIteratorMethod = result.value[Symbol.asyncIterator];
26+
const childIterator: any = childIteratorMethod.call(result.value);
27+
iteratorStack.unshift(childIterator);
28+
return next();
29+
}
30+
return result;
31+
});
32+
}
33+
return ({
34+
next,
35+
return() {
36+
iteratorStack = [];
37+
return iterator.return();
38+
},
39+
throw(error?: mixed): Promise<IteratorResult<T, void>> {
40+
iteratorStack = [];
41+
return iterator.throw(error);
42+
},
43+
[Symbol.asyncIterator]() {
44+
return this;
45+
},
46+
}: $FlowFixMe);
47+
}

src/subscription/subscribe.js

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import type { GraphQLFieldResolver } from '../type/definition';
2424
import { getOperationRootType } from '../utilities/getOperationRootType';
2525

2626
import { mapAsyncIterator } from './mapAsyncIterator';
27+
import { flattenAsyncIterator } from './flattenAsyncIterator';
2728

2829
export type SubscriptionArgs = {|
2930
schema: GraphQLSchema,
@@ -87,8 +88,8 @@ export function subscribe(
8788
// the GraphQL specification. The `execute` function provides the
8889
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
8990
// "ExecuteQuery" algorithm, for which `execute` is also used.
90-
const mapSourceToResponse = (payload) => {
91-
const executionResult = execute({
91+
const mapSourceToResponse = (payload) =>
92+
execute({
9293
schema,
9394
document,
9495
rootValue: payload,
@@ -97,24 +98,19 @@ export function subscribe(
9798
operationName,
9899
fieldResolver,
99100
});
100-
/* istanbul ignore if - TODO: implement support for defer/stream in subscriptions */
101-
if (isAsyncIterable(executionResult)) {
102-
throw new Error(
103-
'TODO: implement support for defer/stream in subscriptions',
104-
);
105-
}
106-
return executionResult;
107-
};
108101

109102
// Resolve the Source Stream, then map every source value to a
110103
// ExecutionResult value as described above.
111104
return sourcePromise.then((resultOrStream) =>
112105
// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
113106
isAsyncIterable(resultOrStream)
114-
? mapAsyncIterator(
115-
resultOrStream,
116-
mapSourceToResponse,
117-
reportGraphQLError,
107+
? flattenAsyncIterator(
108+
mapAsyncIterator(
109+
resultOrStream,
110+
// $FlowFixMe[incompatible-call]
111+
mapSourceToResponse,
112+
reportGraphQLError,
113+
),
118114
)
119115
: ((resultOrStream: any): ExecutionResult),
120116
);

0 commit comments

Comments
 (0)