Skip to content

Commit 6f7d88b

Browse files
josepotcartant
andauthored
fix(shareReplay): handle possible memory leaks (#5932)
* fix(shareReplay): handle possible memory leaks * test: add shareReplay leak test * style(shareReplay): fix linting issues * chore: fix typescript@latest CI failure Co-authored-by: Nicholas Jamieson <nicholas@cartant.com>
1 parent 76c84d7 commit 6f7d88b

File tree

4 files changed

+50
-7
lines changed

4 files changed

+50
-7
lines changed

.github/workflows/ci_ts_latest.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ jobs:
2323
run: |
2424
npm install -g npm@latest
2525
npm ci
26-
npm install --no-save typescript@latest
26+
npm install --no-save typescript@latest tslib@latest @types/node@latest
2727
npm run build_all
2828

spec/operators/shareReplay-spec.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,4 +262,26 @@ describe('shareReplay operator', () => {
262262
expectObservable(result).toBe(expected);
263263
});
264264

265+
const FinalizationRegistry = (global as any).FinalizationRegistry;
266+
if (FinalizationRegistry) {
267+
268+
it('should not leak the subscriber for sync sources', (done) => {
269+
const registry = new FinalizationRegistry((value: any) => {
270+
expect(value).to.equal('callback');
271+
done();
272+
});
273+
let callback: (() => void) | undefined = () => { /* noop */ };
274+
registry.register(callback, 'callback');
275+
276+
const shared = of(42).pipe(shareReplay(1));
277+
shared.subscribe(callback);
278+
279+
callback = undefined;
280+
global.gc();
281+
});
282+
283+
} else {
284+
console.warn(`No support for FinalizationRegistry in Node ${process.version}`);
285+
}
286+
265287
});

spec/support/default.opts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
--reporter dot
88

9+
--expose-gc
910
--check-leaks
1011
--globals WebSocket,FormData,XDomainRequest,ActiveXObject,fetch,AbortController
1112

src/internal/operators/shareReplay.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,14 @@ export interface ShareReplayConfig {
5656
* @method shareReplay
5757
* @owner Observable
5858
*/
59-
export function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;
60-
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
59+
export function shareReplay<T>(
60+
config: ShareReplayConfig
61+
): MonoTypeOperatorFunction<T>;
62+
export function shareReplay<T>(
63+
bufferSize?: number,
64+
windowTime?: number,
65+
scheduler?: SchedulerLike
66+
): MonoTypeOperatorFunction<T>;
6167
export function shareReplay<T>(
6268
configOrBufferSize?: ShareReplayConfig | number,
6369
windowTime?: number,
@@ -71,7 +77,7 @@ export function shareReplay<T>(
7177
bufferSize: configOrBufferSize as number | undefined,
7278
windowTime,
7379
refCount: false,
74-
scheduler
80+
scheduler,
7581
};
7682
}
7783
return (source: Observable<T>) => source.lift(shareReplayOperator(config));
@@ -81,23 +87,28 @@ function shareReplayOperator<T>({
8187
bufferSize = Number.POSITIVE_INFINITY,
8288
windowTime = Number.POSITIVE_INFINITY,
8389
refCount: useRefCount,
84-
scheduler
90+
scheduler,
8591
}: ShareReplayConfig) {
8692
let subject: ReplaySubject<T> | undefined;
8793
let refCount = 0;
8894
let subscription: Subscription | undefined;
8995
let hasError = false;
9096
let isComplete = false;
9197

92-
return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) {
98+
return function shareReplayOperation(
99+
this: Subscriber<T>,
100+
source: Observable<T>
101+
) {
93102
refCount++;
94103
let innerSub: Subscription;
95104
if (!subject || hasError) {
96105
hasError = false;
97106
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
98107
innerSub = subject.subscribe(this);
99108
subscription = source.subscribe({
100-
next(value) { subject.next(value); },
109+
next(value) {
110+
subject.next(value);
111+
},
101112
error(err) {
102113
hasError = true;
103114
subject.error(err);
@@ -108,13 +119,22 @@ function shareReplayOperator<T>({
108119
subject.complete();
109120
},
110121
});
122+
123+
// Here we need to check to see if the source synchronously completed. Although
124+
// we're setting `subscription = undefined` in the completion handler, if the source
125+
// is synchronous, that will happen *before* subscription is set by the return of
126+
// the `subscribe` call.
127+
if (isComplete) {
128+
subscription = undefined;
129+
}
111130
} else {
112131
innerSub = subject.subscribe(this);
113132
}
114133

115134
this.add(() => {
116135
refCount--;
117136
innerSub.unsubscribe();
137+
innerSub = undefined;
118138
if (subscription && !isComplete && useRefCount && refCount === 0) {
119139
subscription.unsubscribe();
120140
subscription = undefined;

0 commit comments

Comments
 (0)