Skip to content

Commit 473876d

Browse files
committed
extract Publisher
1 parent d75872f commit 473876d

File tree

3 files changed

+142
-87
lines changed

3 files changed

+142
-87
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,6 +1302,9 @@ describe('Execute: stream directive', () => {
13021302
path: ['nestedObject', 'nestedFriendList', 0],
13031303
},
13041304
],
1305+
hasNext: true,
1306+
},
1307+
{
13051308
hasNext: false,
13061309
},
13071310
]);

src/execution/execute.ts

Lines changed: 43 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { addPath, pathToArray } from '../jsutils/Path.js';
1212
import { promiseForObject } from '../jsutils/promiseForObject.js';
1313
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
1414
import { promiseReduce } from '../jsutils/promiseReduce.js';
15+
import { Publisher } from '../jsutils/Publisher.js';
1516

1617
import type { GraphQLFormattedError } from '../error/GraphQLError.js';
1718
import { GraphQLError } from '../error/GraphQLError.js';
@@ -121,7 +122,10 @@ export interface ExecutionContext {
121122
typeResolver: GraphQLTypeResolver<any, any>;
122123
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
123124
errors: Array<GraphQLError>;
124-
subsequentPayloads: Set<IncrementalDataRecord>;
125+
publisher: Publisher<
126+
IncrementalDataRecord,
127+
SubsequentIncrementalExecutionResult
128+
>;
125129
}
126130

127131
/**
@@ -353,17 +357,18 @@ function executeImpl(
353357
// in this case is the entire response.
354358
try {
355359
const result = executeOperation(exeContext);
360+
const publisher = exeContext.publisher;
356361
if (isPromise(result)) {
357362
return result.then(
358363
(data) => {
359364
const initialResult = buildResponse(data, exeContext.errors);
360-
if (exeContext.subsequentPayloads.size > 0) {
365+
if (publisher.getPending().size > 0) {
361366
return {
362367
initialResult: {
363368
...initialResult,
364369
hasNext: true,
365370
},
366-
subsequentResults: yieldSubsequentPayloads(exeContext),
371+
subsequentResults: publisher.subscribe(),
367372
};
368373
}
369374
return initialResult;
@@ -375,13 +380,13 @@ function executeImpl(
375380
);
376381
}
377382
const initialResult = buildResponse(result, exeContext.errors);
378-
if (exeContext.subsequentPayloads.size > 0) {
383+
if (publisher.getPending().size > 0) {
379384
return {
380385
initialResult: {
381386
...initialResult,
382387
hasNext: true,
383388
},
384-
subsequentResults: yieldSubsequentPayloads(exeContext),
389+
subsequentResults: publisher.subscribe(),
385390
};
386391
}
387392
return initialResult;
@@ -503,7 +508,7 @@ export function buildExecutionContext(
503508
fieldResolver: fieldResolver ?? defaultFieldResolver,
504509
typeResolver: typeResolver ?? defaultTypeResolver,
505510
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
506-
subsequentPayloads: new Set(),
511+
publisher: new Publisher(getIncrementalResult, returnStreamIterators),
507512
errors: [],
508513
};
509514
}
@@ -515,7 +520,7 @@ function buildPerEventExecutionContext(
515520
return {
516521
...exeContext,
517522
rootValue: payload,
518-
subsequentPayloads: new Set(),
523+
// no need to update publisher, incremental delivery is not supported for subscriptions
519524
errors: [],
520525
};
521526
}
@@ -2098,7 +2103,8 @@ function filterSubsequentPayloads(
20982103
currentIncrementalDataRecord: IncrementalDataRecord | undefined,
20992104
): void {
21002105
const nullPathArray = pathToArray(nullPath);
2101-
exeContext.subsequentPayloads.forEach((incrementalDataRecord) => {
2106+
const publisher = exeContext.publisher;
2107+
publisher.getPending().forEach((incrementalDataRecord) => {
21022108
if (incrementalDataRecord === currentIncrementalDataRecord) {
21032109
// don't remove payload from where error originates
21042110
return;
@@ -2118,24 +2124,26 @@ function filterSubsequentPayloads(
21182124
// ignore error
21192125
});
21202126
}
2121-
exeContext.subsequentPayloads.delete(incrementalDataRecord);
2127+
publisher.delete(incrementalDataRecord);
21222128
});
21232129
}
21242130

2125-
function getCompletedIncrementalResults(
2126-
exeContext: ExecutionContext,
2127-
): Array<IncrementalResult> {
2131+
function getIncrementalResult(
2132+
subsequentPayloads: Set<IncrementalDataRecord>,
2133+
): SubsequentIncrementalExecutionResult | undefined {
21282134
const incrementalResults: Array<IncrementalResult> = [];
2129-
for (const incrementalDataRecord of exeContext.subsequentPayloads) {
2135+
let encounteredCompletedAsyncIterator = false;
2136+
for (const incrementalDataRecord of subsequentPayloads) {
21302137
const incrementalResult: IncrementalResult = {};
21312138
if (!incrementalDataRecord.isCompleted) {
21322139
continue;
21332140
}
2134-
exeContext.subsequentPayloads.delete(incrementalDataRecord);
2141+
subsequentPayloads.delete(incrementalDataRecord);
21352142
if (isStreamItemsRecord(incrementalDataRecord)) {
21362143
const items = incrementalDataRecord.items;
21372144
if (incrementalDataRecord.isCompletedAsyncIterator) {
21382145
// async iterable resolver just finished but there may be pending payloads
2146+
encounteredCompletedAsyncIterator = true;
21392147
continue;
21402148
}
21412149
(incrementalResult as IncrementalStreamResult).items = items;
@@ -2153,80 +2161,26 @@ function getCompletedIncrementalResults(
21532161
}
21542162
incrementalResults.push(incrementalResult);
21552163
}
2156-
return incrementalResults;
2164+
return incrementalResults.length
2165+
? { incremental: incrementalResults, hasNext: subsequentPayloads.size > 0 }
2166+
: encounteredCompletedAsyncIterator && subsequentPayloads.size === 0
2167+
? { hasNext: false }
2168+
: undefined;
21572169
}
21582170

2159-
function yieldSubsequentPayloads(
2160-
exeContext: ExecutionContext,
2161-
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
2162-
let isDone = false;
2163-
2164-
async function next(): Promise<
2165-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2166-
> {
2167-
if (isDone) {
2168-
return { value: undefined, done: true };
2169-
}
2170-
2171-
await Promise.race(
2172-
Array.from(exeContext.subsequentPayloads).map((p) => p.promise),
2173-
);
2174-
2175-
if (isDone) {
2176-
// a different call to next has exhausted all payloads
2177-
return { value: undefined, done: true };
2178-
}
2179-
2180-
const incremental = getCompletedIncrementalResults(exeContext);
2181-
const hasNext = exeContext.subsequentPayloads.size > 0;
2182-
2183-
if (!incremental.length && hasNext) {
2184-
return next();
2185-
}
2186-
2187-
if (!hasNext) {
2188-
isDone = true;
2171+
async function returnStreamIterators(
2172+
subsequentPayloads: ReadonlySet<IncrementalDataRecord>,
2173+
): Promise<void> {
2174+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2175+
subsequentPayloads.forEach((incrementalDataRecord) => {
2176+
if (
2177+
isStreamItemsRecord(incrementalDataRecord) &&
2178+
incrementalDataRecord.asyncIterator?.return
2179+
) {
2180+
promises.push(incrementalDataRecord.asyncIterator.return());
21892181
}
2190-
2191-
return {
2192-
value: incremental.length ? { incremental, hasNext } : { hasNext },
2193-
done: false,
2194-
};
2195-
}
2196-
2197-
function returnStreamIterators() {
2198-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2199-
exeContext.subsequentPayloads.forEach((incrementalDataRecord) => {
2200-
if (
2201-
isStreamItemsRecord(incrementalDataRecord) &&
2202-
incrementalDataRecord.asyncIterator?.return
2203-
) {
2204-
promises.push(incrementalDataRecord.asyncIterator.return());
2205-
}
2206-
});
2207-
return Promise.all(promises);
2208-
}
2209-
2210-
return {
2211-
[Symbol.asyncIterator]() {
2212-
return this;
2213-
},
2214-
next,
2215-
async return(): Promise<
2216-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2217-
> {
2218-
await returnStreamIterators();
2219-
isDone = true;
2220-
return { value: undefined, done: true };
2221-
},
2222-
async throw(
2223-
error?: unknown,
2224-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
2225-
await returnStreamIterators();
2226-
isDone = true;
2227-
return Promise.reject(error);
2228-
},
2229-
};
2182+
});
2183+
await Promise.all(promises);
22302184
}
22312185

22322186
class DeferredFragmentRecord {
@@ -2252,7 +2206,7 @@ class DeferredFragmentRecord {
22522206
this.parentContext = opts.parentContext;
22532207
this.errors = [];
22542208
this._exeContext = opts.exeContext;
2255-
this._exeContext.subsequentPayloads.add(this);
2209+
this._exeContext.publisher.add(this);
22562210
this.isCompleted = false;
22572211
this.data = null;
22582212
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
@@ -2262,6 +2216,7 @@ class DeferredFragmentRecord {
22622216
}).then((data) => {
22632217
this.data = data;
22642218
this.isCompleted = true;
2219+
this._exeContext.publisher.complete(this);
22652220
});
22662221
}
22672222

@@ -2303,7 +2258,7 @@ class StreamItemsRecord {
23032258
this.asyncIterator = opts.asyncIterator;
23042259
this.errors = [];
23052260
this._exeContext = opts.exeContext;
2306-
this._exeContext.subsequentPayloads.add(this);
2261+
this._exeContext.publisher.add(this);
23072262
this.isCompleted = false;
23082263
this.items = null;
23092264
this.promise = new Promise<Array<unknown> | null>((resolve) => {
@@ -2313,6 +2268,7 @@ class StreamItemsRecord {
23132268
}).then((items) => {
23142269
this.items = items;
23152270
this.isCompleted = true;
2271+
this._exeContext.publisher.complete(this);
23162272
});
23172273
}
23182274

src/jsutils/Publisher.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/** @internal */
2+
export class Publisher<P, R> {
3+
_pending: Set<P>;
4+
_update: (pending: Set<P>) => R | undefined;
5+
_onAbruptClose: (pending: Set<P>) => Promise<void>;
6+
// these are assigned within the Promise executor called synchronously within the constructor
7+
_signalled!: Promise<void>;
8+
_resolve!: () => void;
9+
10+
constructor(
11+
update: (pending: Set<P>) => R | undefined,
12+
onAbruptClose: (pending: ReadonlySet<P>) => Promise<void>,
13+
) {
14+
this._pending = new Set();
15+
this._update = update;
16+
this._onAbruptClose = onAbruptClose;
17+
this._reset();
18+
}
19+
20+
_trigger() {
21+
this._resolve();
22+
this._reset();
23+
}
24+
25+
_reset() {
26+
this._signalled = new Promise<void>((resolve) => (this._resolve = resolve));
27+
}
28+
29+
getPending(): ReadonlySet<P> {
30+
return this._pending;
31+
}
32+
33+
add(item: P) {
34+
this._pending.add(item);
35+
}
36+
37+
complete(item: P): void {
38+
if (this._pending.has(item)) {
39+
this._trigger();
40+
}
41+
}
42+
43+
delete(item: P) {
44+
this._pending.delete(item);
45+
this._trigger();
46+
}
47+
48+
subscribe(): AsyncGenerator<R, void, void> {
49+
let isDone = false;
50+
51+
const _next = async (): Promise<IteratorResult<R, void>> => {
52+
// eslint-disable-next-line no-constant-condition
53+
while (true) {
54+
if (isDone) {
55+
return { value: undefined, done: true };
56+
}
57+
58+
const result = this._update(this._pending);
59+
60+
if (this._pending.size === 0) {
61+
isDone = true;
62+
}
63+
64+
if (result !== undefined) {
65+
return { value: result, done: false };
66+
}
67+
68+
// eslint-disable-next-line no-await-in-loop
69+
await this._signalled;
70+
}
71+
};
72+
73+
const _return = async (): Promise<IteratorResult<R, void>> => {
74+
isDone = true;
75+
await this._onAbruptClose(this._pending);
76+
return { value: undefined, done: true };
77+
};
78+
79+
const _throw = async (
80+
error?: unknown,
81+
): Promise<IteratorResult<R, void>> => {
82+
isDone = true;
83+
await this._onAbruptClose(this._pending);
84+
return Promise.reject(error);
85+
};
86+
87+
return {
88+
[Symbol.asyncIterator]() {
89+
return this;
90+
},
91+
next: _next,
92+
return: _return,
93+
throw: _throw,
94+
};
95+
}
96+
}

0 commit comments

Comments
 (0)