Skip to content

Commit 4fedf70

Browse files
committed
add defer/stream support for subscriptions (#7)
# Conflicts: # src/subscription/subscribe.ts
1 parent 53ee7a4 commit 4fedf70

File tree

4 files changed

+353
-20
lines changed

4 files changed

+353
-20
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import { expect } from 'chai';
2+
import { describe, it } from 'mocha';
3+
4+
import { flattenAsyncIterator } from '../flattenAsyncIterator';
5+
6+
describe('flattenAsyncIterator', () => {
7+
it('does not modify an already flat async generator', async () => {
8+
async function* source() {
9+
yield await Promise.resolve(1);
10+
yield await Promise.resolve(2);
11+
yield await Promise.resolve(3);
12+
}
13+
14+
const result = flattenAsyncIterator(source());
15+
16+
expect(await result.next()).to.deep.equal({ value: 1, done: false });
17+
expect(await result.next()).to.deep.equal({ value: 2, done: false });
18+
expect(await result.next()).to.deep.equal({ value: 3, done: false });
19+
expect(await result.next()).to.deep.equal({
20+
value: undefined,
21+
done: true,
22+
});
23+
});
24+
25+
it('does not modify an already flat async iterator', async () => {
26+
const items = [1, 2, 3];
27+
28+
const iterator: any = {
29+
[Symbol.asyncIterator]() {
30+
return this;
31+
},
32+
next() {
33+
return Promise.resolve({
34+
done: items.length === 0,
35+
value: items.shift(),
36+
});
37+
},
38+
};
39+
40+
const result = flattenAsyncIterator(iterator);
41+
42+
expect(await result.next()).to.deep.equal({ value: 1, done: false });
43+
expect(await result.next()).to.deep.equal({ value: 2, done: false });
44+
expect(await result.next()).to.deep.equal({ value: 3, done: false });
45+
expect(await result.next()).to.deep.equal({
46+
value: undefined,
47+
done: true,
48+
});
49+
});
50+
51+
it('flatten nested async generators', async () => {
52+
async function* source() {
53+
yield await Promise.resolve(1);
54+
yield await Promise.resolve(2);
55+
yield await Promise.resolve(
56+
(async function* nested(): AsyncGenerator<number, void, void> {
57+
yield await Promise.resolve(2.1);
58+
yield await Promise.resolve(2.2);
59+
})(),
60+
);
61+
yield await Promise.resolve(3);
62+
}
63+
64+
const doubles = flattenAsyncIterator(source());
65+
66+
const result = [];
67+
for await (const x of doubles) {
68+
result.push(x);
69+
}
70+
expect(result).to.deep.equal([1, 2, 2.1, 2.2, 3]);
71+
});
72+
73+
it('allows returning early from a nested async generator', async () => {
74+
async function* source() {
75+
yield await Promise.resolve(1);
76+
yield await Promise.resolve(2);
77+
yield await Promise.resolve(
78+
(async function* nested(): AsyncGenerator<number, void, void> {
79+
yield await Promise.resolve(2.1); /* c8 ignore start */
80+
// Not reachable, early return
81+
yield await Promise.resolve(2.2);
82+
})(),
83+
);
84+
// Not reachable, early return
85+
yield await Promise.resolve(3);
86+
}
87+
/* c8 ignore stop */
88+
89+
const doubles = flattenAsyncIterator(source());
90+
91+
expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
92+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
93+
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });
94+
95+
// Early return
96+
expect(await doubles.return()).to.deep.equal({
97+
value: undefined,
98+
done: true,
99+
});
100+
101+
// Subsequent next calls
102+
expect(await doubles.next()).to.deep.equal({
103+
value: undefined,
104+
done: true,
105+
});
106+
expect(await doubles.next()).to.deep.equal({
107+
value: undefined,
108+
done: true,
109+
});
110+
});
111+
112+
it('allows throwing errors from a nested async generator', async () => {
113+
async function* source() {
114+
yield await Promise.resolve(1);
115+
yield await Promise.resolve(2);
116+
yield await Promise.resolve(
117+
(async function* nested(): AsyncGenerator<number, void, void> {
118+
yield await Promise.resolve(2.1); /* c8 ignore start */
119+
// Not reachable, early return
120+
yield await Promise.resolve(2.2);
121+
})(),
122+
);
123+
// Not reachable, early return
124+
yield await Promise.resolve(3);
125+
}
126+
/* c8 ignore stop */
127+
128+
const doubles = flattenAsyncIterator(source());
129+
130+
expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
131+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
132+
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });
133+
134+
// Throw error
135+
let caughtError;
136+
try {
137+
await doubles.throw('ouch'); /* c8 ignore start */
138+
// Not reachable, always throws
139+
/* c8 ignore stop */
140+
} catch (e) {
141+
caughtError = e;
142+
}
143+
expect(caughtError).to.equal('ouch');
144+
});
145+
});

src/execution/__tests__/subscribe-test.ts

Lines changed: 147 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,25 @@ const emailSchema = new GraphQLSchema({
7878
},
7979
},
8080
}),
81+
enableDeferStream: true,
8182
});
8283

83-
function createSubscription(pubsub: SimplePubSub<Email>) {
84+
function createSubscription(
85+
pubsub: SimplePubSub<Email>,
86+
variableValues?: { readonly [variable: string]: unknown },
87+
) {
8488
const document = parse(`
85-
subscription ($priority: Int = 0) {
89+
subscription ($priority: Int = 0, $shouldDefer: Boolean = false) {
8690
importantEmail(priority: $priority) {
8791
email {
8892
from
8993
subject
9094
}
91-
inbox {
92-
unread
93-
total
95+
... @defer(if: $shouldDefer) {
96+
inbox {
97+
unread
98+
total
99+
}
94100
}
95101
}
96102
}
@@ -120,7 +126,12 @@ function createSubscription(pubsub: SimplePubSub<Email>) {
120126
}),
121127
};
122128

123-
return subscribe({ schema: emailSchema, document, rootValue: data });
129+
return subscribe({
130+
schema: emailSchema,
131+
document,
132+
rootValue: data,
133+
variableValues,
134+
});
124135
}
125136

126137
async function expectPromise(promise: Promise<unknown>) {
@@ -675,6 +686,136 @@ describe('Subscription Publish Phase', () => {
675686
});
676687
});
677688

689+
it('produces additional payloads for subscriptions with @defer', async () => {
690+
const pubsub = new SimplePubSub<Email>();
691+
const subscription = await createSubscription(pubsub, {
692+
shouldDefer: true,
693+
});
694+
invariant(isAsyncIterable(subscription));
695+
// Wait for the next subscription payload.
696+
const payload = subscription.next();
697+
698+
// A new email arrives!
699+
expect(
700+
pubsub.emit({
701+
from: 'yuzhi@graphql.org',
702+
subject: 'Alright',
703+
message: 'Tests are good',
704+
unread: true,
705+
}),
706+
).to.equal(true);
707+
708+
// The previously waited on payload now has a value.
709+
expect(await payload).to.deep.equal({
710+
done: false,
711+
value: {
712+
data: {
713+
importantEmail: {
714+
email: {
715+
from: 'yuzhi@graphql.org',
716+
subject: 'Alright',
717+
},
718+
},
719+
},
720+
hasNext: true,
721+
},
722+
});
723+
724+
// Wait for the next payload from @defer
725+
expect(await subscription.next()).to.deep.equal({
726+
done: false,
727+
value: {
728+
data: {
729+
inbox: {
730+
unread: 1,
731+
total: 2,
732+
},
733+
},
734+
path: ['importantEmail'],
735+
hasNext: false,
736+
},
737+
});
738+
739+
// Another new email arrives, after all incrementally delivered payloads are received.
740+
expect(
741+
pubsub.emit({
742+
from: 'hyo@graphql.org',
743+
subject: 'Tools',
744+
message: 'I <3 making things',
745+
unread: true,
746+
}),
747+
).to.equal(true);
748+
749+
// The next waited on payload will have a value.
750+
expect(await subscription.next()).to.deep.equal({
751+
done: false,
752+
value: {
753+
data: {
754+
importantEmail: {
755+
email: {
756+
from: 'hyo@graphql.org',
757+
subject: 'Tools',
758+
},
759+
},
760+
},
761+
hasNext: true,
762+
},
763+
});
764+
765+
// Another new email arrives, before the incrementally delivered payloads from the last email was received.
766+
expect(
767+
pubsub.emit({
768+
from: 'adam@graphql.org',
769+
subject: 'Important',
770+
message: 'Read me please',
771+
unread: true,
772+
}),
773+
).to.equal(true);
774+
775+
// Deferred payload from previous event is received.
776+
expect(await subscription.next()).to.deep.equal({
777+
done: false,
778+
value: {
779+
data: {
780+
inbox: {
781+
unread: 2,
782+
total: 3,
783+
},
784+
},
785+
path: ['importantEmail'],
786+
hasNext: false,
787+
},
788+
});
789+
790+
// Next payload from last event
791+
expect(await subscription.next()).to.deep.equal({
792+
done: false,
793+
value: {
794+
data: {
795+
importantEmail: {
796+
email: {
797+
from: 'adam@graphql.org',
798+
subject: 'Important',
799+
},
800+
},
801+
},
802+
hasNext: true,
803+
},
804+
});
805+
806+
// The client disconnects before the deferred payload is consumed.
807+
expect(await subscription.return()).to.deep.equal({
808+
done: true,
809+
value: undefined,
810+
});
811+
812+
// Awaiting a subscription after closing it results in completed results.
813+
expect(await subscription.next()).to.deep.equal({
814+
done: true,
815+
value: undefined,
816+
});
817+
});
818+
678819
it('produces a payload when there are multiple events', async () => {
679820
const pubsub = new SimplePubSub<Email>();
680821
const subscription = await createSubscription(pubsub);

src/execution/flattenAsyncIterator.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { isAsyncIterable } from '../jsutils/isAsyncIterable';
2+
3+
type AsyncIterableOrGenerator<T> =
4+
| AsyncGenerator<T, void, void>
5+
| AsyncIterable<T>;
6+
7+
/**
8+
* Given an AsyncIterable that could potentially yield other async iterators,
9+
* flatten all yielded results into a single AsyncIterable
10+
*/
11+
export function flattenAsyncIterator<T, AT>(
12+
iterable: AsyncIterableOrGenerator<T | AsyncIterableOrGenerator<AT>>,
13+
): AsyncGenerator<T | AT, void, void> {
14+
const iteratorMethod = iterable[Symbol.asyncIterator];
15+
const iterator: any = iteratorMethod.call(iterable);
16+
let iteratorStack: Array<AsyncIterator<T>> = [iterator];
17+
18+
async function next(): Promise<IteratorResult<T | AT, void>> {
19+
const currentIterator = iteratorStack[0];
20+
if (!currentIterator) {
21+
return { value: undefined, done: true };
22+
}
23+
const result = await currentIterator.next();
24+
if (result.done) {
25+
iteratorStack.shift();
26+
return next();
27+
} else if (isAsyncIterable(result.value)) {
28+
const childIterator = result.value[
29+
Symbol.asyncIterator
30+
]() as AsyncIterator<T>;
31+
iteratorStack.unshift(childIterator);
32+
return next();
33+
}
34+
return result;
35+
}
36+
return {
37+
next,
38+
return() {
39+
iteratorStack = [];
40+
return iterator.return();
41+
},
42+
throw(error?: unknown): Promise<IteratorResult<T | AT>> {
43+
iteratorStack = [];
44+
return iterator.throw(error);
45+
},
46+
[Symbol.asyncIterator]() {
47+
return this;
48+
},
49+
};
50+
}

0 commit comments

Comments
 (0)