rxjs
rxjs copied to clipboard
Subscription leakage with circular dependencies (using `defer`)
Describe the bug
This came up as I was building a search component. Basically, what is happening is that subscriptions are not being unsubscribed from when using a combination of defer, delay and share.
So far my testing leads me to believe it has to do with using defer to create a circular dependency. The use case is that you want a search pipeline that needs to refer to the previous search result to get the next page token/cursor, and the cursor observable itself needs to refer to the search pipeline.
Expected behavior
The subscription should be cleaned up accordingly.
Reproduction code
import { Observable, Subject, defer, BehaviorSubject } from "rxjs";
import {
tap,
share,
delay,
mapTo,
withLatestFrom,
startWith
} from "rxjs/operators";
console.clear();
// Imagine an observable that emits search results based on input.
function searchObservable(input: Observable<string>) {
const searchText$ = input.pipe(
// Watch the console; this proves that the subscription is never disposed.
tap((i) => console.log("got search text", i))
);
// This references the `searchResults$` observable below via `defer`.
// The use case would be to grab the next page token/cursor from the
// response.
/* eslint-disable @typescript-eslint/no-use-before-define */
const cursor$: Observable<string | null> = defer(() => searchResults$).pipe(
startWith(null),
share()
);
// This references the `cursor$` observable above, creating the circular dependency.
const searchResults$ = searchText$.pipe(
withLatestFrom(cursor$),
delay(0),
mapTo("boo"),
// Share the search results subscription so we don't create multiple
// searchers. This is because the results are returned as well as used
// for the cursor above.
share()
);
return searchResults$;
}
// This works.
function searchObservableWithWorkaround(input: Observable<string>) {
return new Observable<any>((observer) => {
const cursor = new BehaviorSubject<string | null>(null);
const searchText$ = input.pipe(
tap((i) => console.log("got search text", i))
);
const searchResults$ = searchText$.pipe(
withLatestFrom(cursor),
delay(0),
mapTo("wee"),
share()
);
const sub = searchResults$.subscribe(observer);
return () => {
sub.unsubscribe();
};
});
}
async function run() {
// Search input
let inputSource = new Subject<string>();
// First time.
let sub = searchObservable(inputSource).subscribe(console.log);
// < 'got search text once'
inputSource.next("once");
await waitABit();
sub.unsubscribe();
await waitABit();
// Second time
sub = searchObservable(inputSource).subscribe(console.log);
// < 'got search text twice'
// < 'got search text twice'
inputSource.next("twice");
await waitABit();
sub.unsubscribe();
await waitABit();
// Third time
sub = searchObservable(inputSource).subscribe(console.log);
// < 'got search text thrice'
// < 'got search text thrice'
// < 'got search text thrice'
inputSource.next("thrice");
await waitABit();
sub.unsubscribe();
await waitABit();
// -----
console.log("--- Using workaround ---");
inputSource.complete(); // cleanup from previous
inputSource = new Subject<string>();
// First time.
sub = searchObservableWithWorkaround(inputSource).subscribe(console.log);
// < 'got search text once'
inputSource.next("once");
await waitABit();
sub.unsubscribe();
await waitABit();
sub = searchObservableWithWorkaround(inputSource).subscribe(console.log);
// < 'got search text still once'
inputSource.next("still once");
await waitABit();
sub.unsubscribe();
await waitABit();
}
run();
function waitABit() {
return new Promise((resolve) => setTimeout(resolve, 50));
}
Reproduction URL
https://codesandbox.io/s/jolly-allen-py7boq?file=/src/index.ts
Version
6.x, 7.x, 8.x
Environment
No response
Additional context
I have a workaround for my use case, which involves creating a subject as a proxy to create the circular reference, then complete the subject when the returned observable is unsubscribed from.
I'm bit unsure if this should be treated as a bug vs. having some note something to be careful, similar to the reentrace cases. Example shared in the issue obviously creates a circular references, which blocks to gc in my opinion.
Yes, I don't think this can actually be fixed. I've stepped many times on this stone, also with withLatestFrom.
const source$ = defer(() => result$).pipe(
....
);
const result$ = source$.pipe(..., share());
return result$;
The problem is the operator has no way of knowing where is that subscription coming from.
When you initially set up the observable, no subscription is made. When someone subscribes to result$ (sub#1), this one subscribes to source$. The defer on source$ then creates a new subscription to result$ (sub#2).
The problem now is that if the original subscription (sub#1) closes, result$ still has the subscription from the defer(() => (sub#2). And defer will still "be alive" because result is subscribed to it. This subscription can't be closed, and no operator is at fault here: They are all still alive because they see that there's still an active subscription, sub#2.
The solution is to break down this circular dependency with a subject:
const subject = new Subject();
const source$ = subject.pipe(
....
);
const result$ = source$.pipe(..., tap(r => subject.next(r)));
return result$;
This way there's only 1 subscription. And when that subscription closes, everything cleans up nicely.