Skip to content

Commit fae5da5

Browse files
authored
incremental: subsequent result records should not store parent references (#3929)
as memory then cannot be freed This affects both the existing branching executor on main as well as the non-branching, deduplicated version in #3886 We want to ensure that after an incremental result is sent to the client, no subsequent results reference this result so that garbage collection can free the memory associated with the result. To effect this, two changes are required: 1. Prior to this change, we performed filtering by modifying the children stored on a parent; now we add a flag on the filtered item itself, so that we no longer need a backreference from child to parent. 2. We no longer store a permanent reference on the ExecutionContext to the children of the initial result. Rather, we have the IncrementalPublisher provide a new InitialResultRecord that carries its own errors and children properties.
1 parent 8cfa3de commit fae5da5

File tree

2 files changed

+104
-114
lines changed

2 files changed

+104
-114
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 62 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -97,34 +97,17 @@ export type FormattedIncrementalResult<
9797
* parents have completed so that they can no longer be filtered. This includes all Incremental
9898
* Data records in `released`, as well as Incremental Data records that have not yet completed.
9999
*
100-
* `_initialResult`: a record containing the state of the initial result, as follows:
101-
* `isCompleted`: indicates whether the initial result has completed.
102-
* `children`: the set of Incremental Data records that can be be published when the initial
103-
* result is completed.
104-
*
105-
* Each Incremental Data record also contains similar metadata, i.e. these records also contain
106-
* similar `isCompleted` and `children` properties.
107-
*
108100
* @internal
109101
*/
110102
export class IncrementalPublisher {
111-
private _initialResult: {
112-
children: Set<IncrementalDataRecord>;
113-
isCompleted: boolean;
114-
};
115-
116-
private _released: Set<IncrementalDataRecord>;
117-
private _pending: Set<IncrementalDataRecord>;
103+
private _released: Set<SubsequentDataRecord>;
104+
private _pending: Set<SubsequentDataRecord>;
118105

119106
// these are assigned within the Promise executor called synchronously within the constructor
120107
private _signalled!: Promise<unknown>;
121108
private _resolve!: () => void;
122109

123110
constructor() {
124-
this._initialResult = {
125-
children: new Set(),
126-
isCompleted: false,
127-
};
128111
this._released = new Set();
129112
this._pending = new Set();
130113
this._reset();
@@ -210,19 +193,22 @@ export class IncrementalPublisher {
210193
};
211194
}
212195

196+
prepareInitialResultRecord(): InitialResultRecord {
197+
return {
198+
errors: [],
199+
children: new Set(),
200+
};
201+
}
202+
213203
prepareNewDeferredFragmentRecord(opts: {
214204
label: string | undefined;
215205
path: Path | undefined;
216-
parentContext: IncrementalDataRecord | undefined;
206+
parentContext: IncrementalDataRecord;
217207
}): DeferredFragmentRecord {
218208
const deferredFragmentRecord = new DeferredFragmentRecord(opts);
219209

220210
const parentContext = opts.parentContext;
221-
if (parentContext) {
222-
parentContext.children.add(deferredFragmentRecord);
223-
} else {
224-
this._initialResult.children.add(deferredFragmentRecord);
225-
}
211+
parentContext.children.add(deferredFragmentRecord);
226212

227213
return deferredFragmentRecord;
228214
}
@@ -231,16 +217,12 @@ export class IncrementalPublisher {
231217
label: string | undefined;
232218
path: Path | undefined;
233219
asyncIterator?: AsyncIterator<unknown>;
234-
parentContext: IncrementalDataRecord | undefined;
220+
parentContext: IncrementalDataRecord;
235221
}): StreamItemsRecord {
236222
const streamItemsRecord = new StreamItemsRecord(opts);
237223

238224
const parentContext = opts.parentContext;
239-
if (parentContext) {
240-
parentContext.children.add(streamItemsRecord);
241-
} else {
242-
this._initialResult.children.add(streamItemsRecord);
243-
}
225+
parentContext.children.add(streamItemsRecord);
244226

245227
return streamItemsRecord;
246228
}
@@ -274,36 +256,36 @@ export class IncrementalPublisher {
274256
incrementalDataRecord.errors.push(error);
275257
}
276258

277-
publishInitial() {
278-
for (const child of this._initialResult.children) {
259+
publishInitial(initialResult: InitialResultRecord) {
260+
for (const child of initialResult.children) {
261+
if (child.filtered) {
262+
continue;
263+
}
279264
this._publish(child);
280265
}
281266
}
282267

283-
filter(
284-
nullPath: Path,
285-
erroringIncrementalDataRecord: IncrementalDataRecord | undefined,
286-
) {
268+
getInitialErrors(
269+
initialResult: InitialResultRecord,
270+
): ReadonlyArray<GraphQLError> {
271+
return initialResult.errors;
272+
}
273+
274+
filter(nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord) {
287275
const nullPathArray = pathToArray(nullPath);
288276

289277
const asyncIterators = new Set<AsyncIterator<unknown>>();
290278

291-
const children =
292-
erroringIncrementalDataRecord === undefined
293-
? this._initialResult.children
294-
: erroringIncrementalDataRecord.children;
279+
const descendants = this._getDescendants(
280+
erroringIncrementalDataRecord.children,
281+
);
295282

296-
for (const child of this._getDescendants(children)) {
283+
for (const child of descendants) {
297284
if (!this._matchesPath(child.path, nullPathArray)) {
298285
continue;
299286
}
300287

301-
this._delete(child);
302-
const parent =
303-
child.parentContext === undefined
304-
? this._initialResult
305-
: child.parentContext;
306-
parent.children.delete(child);
288+
child.filtered = true;
307289

308290
if (isStreamItemsRecord(child)) {
309291
if (child.asyncIterator !== undefined) {
@@ -333,37 +315,34 @@ export class IncrementalPublisher {
333315
this._signalled = signalled;
334316
}
335317

336-
private _introduce(item: IncrementalDataRecord) {
318+
private _introduce(item: SubsequentDataRecord) {
337319
this._pending.add(item);
338320
}
339321

340-
private _release(item: IncrementalDataRecord): void {
322+
private _release(item: SubsequentDataRecord): void {
341323
if (this._pending.has(item)) {
342324
this._released.add(item);
343325
this._trigger();
344326
}
345327
}
346328

347-
private _push(item: IncrementalDataRecord): void {
329+
private _push(item: SubsequentDataRecord): void {
348330
this._released.add(item);
349331
this._pending.add(item);
350332
this._trigger();
351333
}
352334

353-
private _delete(item: IncrementalDataRecord) {
354-
this._released.delete(item);
355-
this._pending.delete(item);
356-
this._trigger();
357-
}
358-
359335
private _getIncrementalResult(
360-
completedRecords: ReadonlySet<IncrementalDataRecord>,
336+
completedRecords: ReadonlySet<SubsequentDataRecord>,
361337
): SubsequentIncrementalExecutionResult | undefined {
362338
const incrementalResults: Array<IncrementalResult> = [];
363339
let encounteredCompletedAsyncIterator = false;
364340
for (const incrementalDataRecord of completedRecords) {
365341
const incrementalResult: IncrementalResult = {};
366342
for (const child of incrementalDataRecord.children) {
343+
if (child.filtered) {
344+
continue;
345+
}
367346
this._publish(child);
368347
}
369348
if (isStreamItemsRecord(incrementalDataRecord)) {
@@ -396,18 +375,18 @@ export class IncrementalPublisher {
396375
: undefined;
397376
}
398377

399-
private _publish(incrementalDataRecord: IncrementalDataRecord) {
400-
if (incrementalDataRecord.isCompleted) {
401-
this._push(incrementalDataRecord);
378+
private _publish(subsequentResultRecord: SubsequentDataRecord) {
379+
if (subsequentResultRecord.isCompleted) {
380+
this._push(subsequentResultRecord);
402381
} else {
403-
this._introduce(incrementalDataRecord);
382+
this._introduce(subsequentResultRecord);
404383
}
405384
}
406385

407386
private _getDescendants(
408-
children: ReadonlySet<IncrementalDataRecord>,
409-
descendants = new Set<IncrementalDataRecord>(),
410-
): ReadonlySet<IncrementalDataRecord> {
387+
children: ReadonlySet<SubsequentDataRecord>,
388+
descendants = new Set<SubsequentDataRecord>(),
389+
): ReadonlySet<SubsequentDataRecord> {
411390
for (const child of children) {
412391
descendants.add(child);
413392
this._getDescendants(child.children, descendants);
@@ -429,26 +408,27 @@ export class IncrementalPublisher {
429408
}
430409
}
431410

411+
export interface InitialResultRecord {
412+
errors: Array<GraphQLError>;
413+
children: Set<SubsequentDataRecord>;
414+
}
415+
432416
/** @internal */
433417
export class DeferredFragmentRecord {
434418
errors: Array<GraphQLError>;
435419
label: string | undefined;
436420
path: Array<string | number>;
437421
data: ObjMap<unknown> | null;
438-
parentContext: IncrementalDataRecord | undefined;
439-
children: Set<IncrementalDataRecord>;
422+
children: Set<SubsequentDataRecord>;
440423
isCompleted: boolean;
441-
constructor(opts: {
442-
label: string | undefined;
443-
path: Path | undefined;
444-
parentContext: IncrementalDataRecord | undefined;
445-
}) {
424+
filtered: boolean;
425+
constructor(opts: { label: string | undefined; path: Path | undefined }) {
446426
this.label = opts.label;
447427
this.path = pathToArray(opts.path);
448-
this.parentContext = opts.parentContext;
449428
this.errors = [];
450429
this.children = new Set();
451430
this.isCompleted = false;
431+
this.filtered = false;
452432
this.data = null;
453433
}
454434
}
@@ -459,33 +439,34 @@ export class StreamItemsRecord {
459439
label: string | undefined;
460440
path: Array<string | number>;
461441
items: Array<unknown> | null;
462-
parentContext: IncrementalDataRecord | undefined;
463-
children: Set<IncrementalDataRecord>;
442+
children: Set<SubsequentDataRecord>;
464443
asyncIterator: AsyncIterator<unknown> | undefined;
465444
isCompletedAsyncIterator?: boolean;
466445
isCompleted: boolean;
446+
filtered: boolean;
467447
constructor(opts: {
468448
label: string | undefined;
469449
path: Path | undefined;
470450
asyncIterator?: AsyncIterator<unknown>;
471-
parentContext: IncrementalDataRecord | undefined;
472451
}) {
473452
this.items = null;
474453
this.label = opts.label;
475454
this.path = pathToArray(opts.path);
476-
this.parentContext = opts.parentContext;
477455
this.asyncIterator = opts.asyncIterator;
478456
this.errors = [];
479457
this.children = new Set();
480458
this.isCompleted = false;
459+
this.filtered = false;
481460
this.items = null;
482461
}
483462
}
484463

485-
export type IncrementalDataRecord = DeferredFragmentRecord | StreamItemsRecord;
464+
export type SubsequentDataRecord = DeferredFragmentRecord | StreamItemsRecord;
465+
466+
export type IncrementalDataRecord = InitialResultRecord | SubsequentDataRecord;
486467

487468
function isStreamItemsRecord(
488-
incrementalDataRecord: IncrementalDataRecord,
489-
): incrementalDataRecord is StreamItemsRecord {
490-
return incrementalDataRecord instanceof StreamItemsRecord;
469+
subsequentResultRecord: SubsequentDataRecord,
470+
): subsequentResultRecord is StreamItemsRecord {
471+
return subsequentResultRecord instanceof StreamItemsRecord;
491472
}

0 commit comments

Comments
 (0)