rxjs icon indicating copy to clipboard operation
rxjs copied to clipboard

Subscription leakage with circular dependencies (using `defer`)

Open jeffijoe opened this issue 2 years ago • 2 comments

Describe the bug

This came up as I was building a search component. Basically, what is happening is that subscriptions are not being unsubscribed from when using a combination of defer, delay and share.

So far my testing leads me to believe it has to do with using defer to create a circular dependency. The use case is that you want a search pipeline that needs to refer to the previous search result to get the next page token/cursor, and the cursor observable itself needs to refer to the search pipeline.

Expected behavior

The subscription should be cleaned up accordingly.

Reproduction code

import { Observable, Subject, defer, BehaviorSubject } from "rxjs";
import {
  tap,
  share,
  delay,
  mapTo,
  withLatestFrom,
  startWith
} from "rxjs/operators";

console.clear();

// Imagine an observable that emits search results based on input.
function searchObservable(input: Observable<string>) {
  const searchText$ = input.pipe(
    // Watch the console; this proves that the subscription is never disposed.
    tap((i) => console.log("got search text", i))
  );

  // This references the `searchResults$` observable below via `defer`.
  // The use case would be to grab the next page token/cursor from the
  // response.
  /* eslint-disable @typescript-eslint/no-use-before-define */
  const cursor$: Observable<string | null> = defer(() => searchResults$).pipe(
    startWith(null),
    share()
  );

  // This references the `cursor$` observable above, creating the circular dependency.
  const searchResults$ = searchText$.pipe(
    withLatestFrom(cursor$),
    delay(0),
    mapTo("boo"),
    // Share the search results subscription so we don't create multiple
    // searchers. This is because the results are returned as well as used
    // for the cursor above.
    share()
  );

  return searchResults$;
}

// This works.
function searchObservableWithWorkaround(input: Observable<string>) {
  return new Observable<any>((observer) => {
    const cursor = new BehaviorSubject<string | null>(null);
    const searchText$ = input.pipe(
      tap((i) => console.log("got search text", i))
    );

    const searchResults$ = searchText$.pipe(
      withLatestFrom(cursor),
      delay(0),
      mapTo("wee"),
      share()
    );

    const sub = searchResults$.subscribe(observer);

    return () => {
      sub.unsubscribe();
    };
  });
}

async function run() {
  // Search input
  let inputSource = new Subject<string>();

  // First time.
  let sub = searchObservable(inputSource).subscribe(console.log);
  // < 'got search text once'
  inputSource.next("once");
  await waitABit();
  sub.unsubscribe();
  await waitABit();

  // Second time
  sub = searchObservable(inputSource).subscribe(console.log);
  // < 'got search text twice'
  // < 'got search text twice'
  inputSource.next("twice");
  await waitABit();
  sub.unsubscribe();
  await waitABit();

  // Third time
  sub = searchObservable(inputSource).subscribe(console.log);
  // < 'got search text thrice'
  // < 'got search text thrice'
  // < 'got search text thrice'
  inputSource.next("thrice");
  await waitABit();
  sub.unsubscribe();
  await waitABit();

  // -----

  console.log("--- Using workaround ---");

  inputSource.complete(); // cleanup from previous
  inputSource = new Subject<string>();

  // First time.
  sub = searchObservableWithWorkaround(inputSource).subscribe(console.log);
  // < 'got search text once'
  inputSource.next("once");
  await waitABit();
  sub.unsubscribe();
  await waitABit();

  sub = searchObservableWithWorkaround(inputSource).subscribe(console.log);
  // < 'got search text still once'
  inputSource.next("still once");
  await waitABit();
  sub.unsubscribe();
  await waitABit();
}

run();

function waitABit() {
  return new Promise((resolve) => setTimeout(resolve, 50));
}

Reproduction URL

https://codesandbox.io/s/jolly-allen-py7boq?file=/src/index.ts

Version

6.x, 7.x, 8.x

Environment

No response

Additional context

I have a workaround for my use case, which involves creating a subject as a proxy to create the circular reference, then complete the subject when the returned observable is unsubscribed from.

jeffijoe avatar Jan 12 '23 18:01 jeffijoe

I'm bit unsure if this should be treated as a bug vs. having some note something to be careful, similar to the reentrace cases. Example shared in the issue obviously creates a circular references, which blocks to gc in my opinion.

kwonoj avatar Jan 13 '23 17:01 kwonoj

Yes, I don't think this can actually be fixed. I've stepped many times on this stone, also with withLatestFrom.

const source$ = defer(() => result$).pipe(
 ....
);
const result$ = source$.pipe(..., share());

return result$;

The problem is the operator has no way of knowing where is that subscription coming from.

When you initially set up the observable, no subscription is made. When someone subscribes to result$ (sub#1), this one subscribes to source$. The defer on source$ then creates a new subscription to result$ (sub#2).

The problem now is that if the original subscription (sub#1) closes, result$ still has the subscription from the defer(() => (sub#2). And defer will still "be alive" because result is subscribed to it. This subscription can't be closed, and no operator is at fault here: They are all still alive because they see that there's still an active subscription, sub#2.

The solution is to break down this circular dependency with a subject:

const subject = new Subject();
const source$ = subject.pipe(
 ....
);
const result$ = source$.pipe(..., tap(r => subject.next(r)));

return result$;

This way there's only 1 subscription. And when that subscription closes, everything cleans up nicely.

voliva avatar Jan 16 '23 10:01 voliva