rxjs
rxjs copied to clipboard
shareReplay with refCount stops working
Describe the bug
When subscribe to shared observable multiple times internal connection to the source observable gets broken. It stops emit values on subscribe.
Expected behavior
Expected output:
first: 7.5.1
second: 7.5.1
Actual output
first: 7.5.1
Reproduction code
import { BehaviorSubject, merge, of, Subject } from 'rxjs';
import {
shareReplay,
take,
takeUntil,
materialize,
filter,
} from 'rxjs/operators';
const shared$ = new BehaviorSubject('7.5.1').pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
const isCompleted$ = shared$.pipe(
materialize(),
filter((n) => n.kind === 'C'),
take(1)
);
const work = new Subject().pipe(takeUntil(isCompleted$));
merge(work, shared$)
.pipe(take(1))
.subscribe((value) => console.log('first: ', value));
shared$.subscribe((value) => console.log('second:', value));
Reproduction URL
Version
7.5.1
Environment
No response
Additional context
No response
Removing .pipe(take(1))
makes the 7.5.1 snippet work. So it looks like take
is somehow affecting the shareReplay
operator.
I had run in to something similar in my own project. So I ended up writing a simple operator that works like take
but doesn't complete.
import { filter, map, scan } from 'rxjs/operators';
/**
* Similar to the take operator, as in, it lets the first few values through.
* However, unlike 'take' this operator doesn't complete.
*
* @param {int} few The number of emissions to let through
* @returns {function(Observable): Observable}
*/
export default function first(few = 1) {
return (source) => source.pipe(
scan(
(acc, value) => {
if (acc.count lastValue),
filter((lastValue) => !!lastValue),
);
}
Replace take
with first
.
Confirmed I am also seeing behavior where shareReplay({ bufferSize: 1, refCount: true })
stops emitting. My theory is also that it is causing the source observable to somehow complete when it should not, and it is not reconnecting.
Using first()
did not solve my issue. Only setting refCount: false
gets things working again (which of course introduces risk for a memory leak)
I feel like I am missing something basic here, but something definitely changed in the functionality between 6.x and 7.current.
For me there seems to be a commonality of using combineLatest([x$, y$])
in combination with shareReplay()
that isn't working. Still tracking it down.
I think this might be related: https://github.com/ReactiveX/rxjs/pull/5634/files#diff-44fa2a928f593fdd8b981dac7737a65f6b0ddc844300af18a539c59b683b5467R130
Looks like shareReplay now calls complete()
on the subscribed subject/connector when the source completes. I don't think this this was in the original shareReplay
behavior?
Am I missing something?
I'm experiencing this as well. It happens when you subscribe multiple times, then unsubscribe and resubscribe again (is this called a reentrant subscription?). The code in the original post is a good example since multiple things subscribe to shared$
.
Specifically, it happens if the code emits and unsubscribes immediately. If it emits after the first subscription, things seem to work as expected. It looks like it's a race condition in the setup code and the reset at refcount 0 logic.
Excuse my atrocious console logging, but this is what I see when a subscription is made and the source emits after a delay:

Whereas this is what happens if the source emits immediately:

As you can see, if a value emits immediately and both subscriptions unsubscribe (as you might get with take(1)
or firstValueFrom()
), the reset happens before the logic for the second subscription finishes. The shareReplay
then gets into a bad state - it maintains a subscription to the source, and doesn't resubscribe when new results come in.
This started happening in 7 because of this new code, which prevents excess subscriptions to the source observable. There was also some rearranging of the code which may have prevented it in 6, I'm not sure.
https://github.com/ReactiveX/rxjs/blob/47fa8d555754b18887baf15e22eb3dd91bf8bfea/src/internal/operators/share.ts#L214-L220
Flipping the order of the code so the source subscription check runs before the refCount decrement subscription seems to fix the issue for me, but there are some comments explicitly wanting to do it in the other order, so I don't know if that solution will work.
@henry-alakazhang Do you have a code reproduction you can share that demonstrates the issue? It would probably help the devs make sure it gets tested and fixed! (I haven't been able to reproduce this outside of my app - sporadically.)
I haven't tried to reproduce it outside my app either, but the reproduction in the issue itself should demonstrate the issue fine.
- shareReplay on an observable that emits immediately (eg. a BehaviorSubject)
- Multiple simultaneous subscription/unsubscriptions
- Another later subscription
I can simplify the reproduction a little more:
// Apply `replay` to an observable which emits immediately once
const shared$ = new BehaviorSubject('6.4.0').pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);
// Subscribe to it multiple times simultaneously, then unsubscribe simultaneously
combineLatest([shared$, shared$]).pipe(
take(1)
).toPromise();
// It doesn't emit again.
const lateSub = shared$.subscribe((value) => console.log('late:', value));
This example specifically is minimal to the point of silliness (combineLatest on the same observable) but if you imagine doing a .pipe()
and some transformations and merging it back in, it can be a real use case.
Note that also:
// If we reset it (via refCount = 0), the state gets fixed.
lateSub.unsubscribe();
shared$.subscribe((value) => console.log('later:', value)); // emits on both 7.5.1 and 6.4.0
Stackblitz 6.4.0: https://stackblitz.com/edit/q7u9jg-xxev1z?file=index.ts Logs "late:" and "later:" Stackblitz 7.5.1: https://stackblitz.com/edit/q7u9jg-p16qzd?file=index.ts Logs only "later:"
Hey guys,
Do you have any update on this?
@benlesh it seems like part of the issue (or a tangential issue) is, with the refactor of share
, we lost some of the granularity previously provided.
For example, publishReplay + refCount
used to unsubscribe from source when the ref count dropped to 0, while still preserving the underlying ReplaySubject
. With the newly configurable share
, those two things are coupled with resetOnRefCountZero
. So you can have the behavior of unsubscribe from source + generate new ReplaySubject
, or neither. But not the aforementioned combination.
tl;dr resetOnRefCountZero
has become a proxy for shareReplay
's behavior with { refCount: true }
. We have lost the functionality previously provided by publishReplay + refCount
.
@benlesh bumping this ^