Skip to content

Commit e2f2e51

Browse files
cartante-davidson
andauthored
fix: returned operator functions from multicast operators share, publish, publishReplay are now referentially transparent. Meaning if you take the result of calling publish() and pass it to N observable pipe methods, it will behave the same in each case, rather than having a cumulative effect, which was a regression introduced sometime in 6.x. If you required this broken behavior, there is a work around posted [here](#6410 (comment)) (#6410)
* fix(publish,publishReplay): resolve sharing Subject change publish operator to use factory change publishReplay operator to not share ReplaySubject fixes issue #5411 * test: rearrange tests * test: add failing ref transparency tests * fix(publishBehavior): make ref transparent * fix(publishLast): make ref transparent * test: add failing ref transparency tests * fix(share): make ref transparent * chore: add a comment * test: change descriptions and add comments * refactor: destructure options outside of op func * chore: use consistent terminology in comments * test: use consistent terminology Co-authored-by: Eli Davidson <edavidson@broadfinancial.com>
1 parent 2fb22bf commit e2f2e51

11 files changed

+261
-94
lines changed

spec/operators/publish-spec.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { expect } from 'chai';
22
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
33
import { publish, zip, mergeMapTo, mergeMap, tap, refCount, retry, repeat } from 'rxjs/operators';
4-
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
4+
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';
55

66
/** @test {publish} */
77
describe('publish operator', () => {
@@ -337,4 +337,31 @@ describe('publish operator', () => {
337337
expect(subscriptions).to.equal(1);
338338
done();
339339
});
340+
341+
it('should be referentially-transparent', () => {
342+
const source1 = cold('-1-2-3-4-5-|');
343+
const source1Subs = '^ !';
344+
const expected1 = '-1-2-3-4-5-|';
345+
const source2 = cold('-6-7-8-9-0-|');
346+
const source2Subs = '^ !';
347+
const expected2 = '-6-7-8-9-0-|';
348+
349+
// Calls to the _operator_ must be referentially-transparent.
350+
const partialPipeLine = pipe(
351+
publish()
352+
);
353+
354+
// The non-referentially-transparent publishing occurs within the _operator function_
355+
// returned by the _operator_ and that happens when the complete pipeline is composed.
356+
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
357+
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;
358+
359+
expectObservable(published1).toBe(expected1);
360+
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
361+
expectObservable(published2).toBe(expected2);
362+
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
363+
364+
published1.connect();
365+
published2.connect();
366+
});
340367
});

spec/operators/publishBehavior-spec.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { expect } from 'chai';
22
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
33
import { publishBehavior, mergeMapTo, tap, mergeMap, refCount, retry, repeat } from 'rxjs/operators';
4-
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
4+
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';
55

66
/** @test {publishBehavior} */
77
describe('publishBehavior operator', () => {
@@ -344,4 +344,31 @@ describe('publishBehavior operator', () => {
344344
expect(results).to.deep.equal([]);
345345
done();
346346
});
347+
348+
it('should be referentially-transparent', () => {
349+
const source1 = cold('-1-2-3-4-5-|');
350+
const source1Subs = '^ !';
351+
const expected1 = 'x1-2-3-4-5-|';
352+
const source2 = cold('-6-7-8-9-0-|');
353+
const source2Subs = '^ !';
354+
const expected2 = 'x6-7-8-9-0-|';
355+
356+
// Calls to the _operator_ must be referentially-transparent.
357+
const partialPipeLine = pipe(
358+
publishBehavior('x')
359+
);
360+
361+
// The non-referentially-transparent publishing occurs within the _operator function_
362+
// returned by the _operator_ and that happens when the complete pipeline is composed.
363+
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
364+
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;
365+
366+
expectObservable(published1).toBe(expected1);
367+
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
368+
expectObservable(published2).toBe(expected2);
369+
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
370+
371+
published1.connect();
372+
published2.connect();
373+
});
347374
});

spec/operators/publishLast-spec.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { expect } from 'chai';
22
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
33
import { publishLast, mergeMapTo, tap, mergeMap, refCount, retry } from 'rxjs/operators';
4-
import { ConnectableObservable, of, Subscription, Observable } from 'rxjs';
4+
import { ConnectableObservable, of, Subscription, Observable, pipe } from 'rxjs';
55

66
/** @test {publishLast} */
77
describe('publishLast operator', () => {
@@ -261,4 +261,31 @@ describe('publishLast operator', () => {
261261
expect(subscriptions).to.equal(1);
262262
done();
263263
});
264+
265+
it('should be referentially-transparent', () => {
266+
const source1 = cold('-1-2-3-4-5-|');
267+
const source1Subs = '^ !';
268+
const expected1 = '-----------(5|)';
269+
const source2 = cold('-6-7-8-9-0-|');
270+
const source2Subs = '^ !';
271+
const expected2 = '-----------(0|)';
272+
273+
// Calls to the _operator_ must be referentially-transparent.
274+
const partialPipeLine = pipe(
275+
publishLast()
276+
);
277+
278+
// The non-referentially-transparent publishing occurs within the _operator function_
279+
// returned by the _operator_ and that happens when the complete pipeline is composed.
280+
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
281+
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;
282+
283+
expectObservable(published1).toBe(expected1);
284+
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
285+
expectObservable(published2).toBe(expected2);
286+
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
287+
288+
published1.connect();
289+
published2.connect();
290+
});
264291
});

spec/operators/publishReplay-spec.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { expect } from 'chai';
22
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
3-
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription } from 'rxjs';
3+
import { throwError, ConnectableObservable, EMPTY, NEVER, of, Observable, Subscription, pipe } from 'rxjs';
44
import { publishReplay, mergeMapTo, tap, mergeMap, refCount, retry, repeat, map } from 'rxjs/operators';
55

66
/** @test {publishReplay} */
@@ -487,4 +487,31 @@ describe('publishReplay operator', () => {
487487
expectObservable(published).toBe(expected, undefined, error);
488488
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
489489
});
490+
491+
it('should be referentially-transparent', () => {
492+
const source1 = cold('-1-2-3-4-5-|');
493+
const source1Subs = '^ !';
494+
const expected1 = '-1-2-3-4-5-|';
495+
const source2 = cold('-6-7-8-9-0-|');
496+
const source2Subs = '^ !';
497+
const expected2 = '-6-7-8-9-0-|';
498+
499+
// Calls to the _operator_ must be referentially-transparent.
500+
const partialPipeLine = pipe(
501+
publishReplay(1)
502+
);
503+
504+
// The non-referentially-transparent publishing occurs within the _operator function_
505+
// returned by the _operator_ and that happens when the complete pipeline is composed.
506+
const published1 = source1.pipe(partialPipeLine) as ConnectableObservable<any>;
507+
const published2 = source2.pipe(partialPipeLine) as ConnectableObservable<any>;
508+
509+
expectObservable(published1).toBe(expected1);
510+
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
511+
expectObservable(published2).toBe(expected2);
512+
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
513+
514+
published1.connect();
515+
published2.connect();
516+
});
490517
});

spec/operators/share-spec.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/** @prettier */
22
import { expect } from 'chai';
3-
import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError } from 'rxjs';
3+
import { asapScheduler, concat, config, defer, EMPTY, NEVER, Observable, of, scheduled, Subject, throwError, pipe } from 'rxjs';
44
import {
55
map,
66
mergeMap,
@@ -619,6 +619,30 @@ describe('share', () => {
619619
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
620620
});
621621
});
622+
623+
it('should be referentially-transparent', () => {
624+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
625+
const source1 = cold('-1-2-3-4-5-|');
626+
const source1Subs = ' ^----------!';
627+
const expected1 = ' -1-2-3-4-5-|';
628+
const source2 = cold('-6-7-8-9-0-|');
629+
const source2Subs = ' ^----------!';
630+
const expected2 = ' -6-7-8-9-0-|';
631+
632+
// Calls to the _operator_ must be referentially-transparent.
633+
const partialPipeLine = pipe(share({ resetOnRefCountZero }));
634+
635+
// The non-referentially-transparent sharing occurs within the _operator function_
636+
// returned by the _operator_ and that happens when the complete pipeline is composed.
637+
const shared1 = source1.pipe(partialPipeLine);
638+
const shared2 = source2.pipe(partialPipeLine);
639+
640+
expectObservable(shared1).toBe(expected1);
641+
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
642+
expectObservable(shared2).toBe(expected2);
643+
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
644+
});
645+
});
622646
});
623647
}
624648

spec/operators/shareReplay-spec.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { expect } from 'chai';
33
import * as sinon from 'sinon';
44
import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators';
55
import { TestScheduler } from 'rxjs/testing';
6-
import { Observable, Operator, Observer, of, from, defer } from 'rxjs';
6+
import { Observable, Operator, Observer, of, from, defer, pipe } from 'rxjs';
77
import { observableMatcher } from '../helpers/observableMatcher';
88

99
/** @test {shareReplay} */
@@ -387,4 +387,28 @@ describe('shareReplay', () => {
387387
} else {
388388
console.warn(`No support for FinalizationRegistry in Node ${process.version}`);
389389
}
390+
391+
it('should be referentially-transparent', () => {
392+
testScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
393+
const source1 = cold('-1-2-3-4-5-|');
394+
const source1Subs = ' ^----------!';
395+
const expected1 = ' -1-2-3-4-5-|';
396+
const source2 = cold('-6-7-8-9-0-|');
397+
const source2Subs = ' ^----------!';
398+
const expected2 = ' -6-7-8-9-0-|';
399+
400+
// Calls to the _operator_ must be referentially-transparent.
401+
const partialPipeLine = pipe(shareReplay({ refCount: false }));
402+
403+
// The non-referentially-transparent sharing occurs within the _operator function_
404+
// returned by the _operator_ and that happens when the complete pipeline is composed.
405+
const shared1 = source1.pipe(partialPipeLine);
406+
const shared2 = source2.pipe(partialPipeLine);
407+
408+
expectObservable(shared1).toBe(expected1);
409+
expectSubscriptions(source1.subscriptions).toBe(source1Subs);
410+
expectObservable(shared2).toBe(expected2);
411+
expectSubscriptions(source2.subscriptions).toBe(source2Subs);
412+
});
413+
});
390414
});

src/internal/operators/publish.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,5 +85,5 @@ export function publish<T, O extends ObservableInput<any>>(selector: (shared: Ob
8585
* Details: https://rxjs.dev/deprecations/multicasting
8686
*/
8787
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
88-
return selector ? connect(selector) : multicast(new Subject<T>());
88+
return selector ? (source) => connect(selector)(source) : (source) => multicast(new Subject<T>())(source);
8989
}

src/internal/operators/publishBehavior.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import { UnaryFunction } from '../types';
1818
* Details: https://rxjs.dev/deprecations/multicasting
1919
*/
2020
export function publishBehavior<T>(initialValue: T): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
21-
const subject = new BehaviorSubject<T>(initialValue);
2221
// Note that this has *never* supported the selector function.
23-
return (source) => new ConnectableObservable(source, () => subject);
22+
return (source) => {
23+
const subject = new BehaviorSubject<T>(initialValue);
24+
return new ConnectableObservable(source, () => subject);
25+
};
2426
}

src/internal/operators/publishLast.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ import { UnaryFunction } from '../types';
6767
* Details: https://rxjs.dev/deprecations/multicasting
6868
*/
6969
export function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>> {
70-
const subject = new AsyncSubject<T>();
7170
// Note that this has *never* supported a selector function like `publish` and `publishReplay`.
72-
return (source) => new ConnectableObservable(source, () => subject);
71+
return (source) => {
72+
const subject = new AsyncSubject<T>();
73+
return new ConnectableObservable(source, () => subject);
74+
};
7375
}

src/internal/operators/publishReplay.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,8 @@ export function publishReplay<T, R>(
8989
if (selectorOrScheduler && !isFunction(selectorOrScheduler)) {
9090
timestampProvider = selectorOrScheduler;
9191
}
92-
9392
const selector = isFunction(selectorOrScheduler) ? selectorOrScheduler : undefined;
94-
const subject = new ReplaySubject<T>(bufferSize, windowTime, timestampProvider);
95-
9693
// Note, we're passing `selector!` here, because at runtime, `undefined` is an acceptable argument
9794
// but it makes our TypeScript signature for `multicast` unhappy (as it should, because it's gross).
98-
return (source: Observable<T>) => multicast(subject, selector!)(source);
95+
return (source: Observable<T>) => multicast(new ReplaySubject<T>(bufferSize, windowTime, timestampProvider), selector!)(source);
9996
}

0 commit comments

Comments
 (0)