rxjs icon indicating copy to clipboard operation
rxjs copied to clipboard

Synchronous execution of disposer

Open ronag opened this issue 1 year ago • 6 comments

Describe the bug

We implement an operator called combineMap which is a more efficient variant of the following pattern:

values$.pipe(rxjs.switchMap(values => value.length > 0 ? rxjs.combineMap(values.map(selector)) : rxjs.of([]) 

Instead, we can do the following which will re-use the results and subscriptions for unchanged values:

values$.pipe(combineMap(selector))

This works fine most of the time, but sometimes in production we get the following error:

    TypeError: Cannot read properties of null (reading 'unsubscribe')
        at /home/jesper/nxtedition/nxt/asset/node_modules/@nxtedition/lib/rxjs/combineMap.js:136:28
        at execFinalizer (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:172:9)
        at Subscription.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:89:29)
        at Subscriber.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscriber.js:75:42)
        at OperatorSubscriber.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/operators/OperatorSubscriber.js:72:42)
        at execFinalizer (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:175:19)
        at Subscription.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:89:29)
        at Subscriber.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscriber.js:75:42)
        at OperatorSubscriber.unsubscribe (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/operators/OperatorSubscriber.js:72:42)
        at execFinalizer (/home/jesper/nxtedition/nxt/asset/node_modules/rxjs/dist/cjs/internal/Subscription.js:175:19)

Give the following code:

const EMPTY = Object.freeze([])

function combineMap(project, equals = (a, b) => a === b) {
  const self = this
  return new rxjs.Observable((o) => {
    let curr = EMPTY
    let scheduled = false
    let dirty = false
    let active = 0
    let empty = 0

    const _error = (err) => o.error(err)

    function _update() {
      scheduled = false

      if (empty) {
        return
      }

      if (dirty) {
        dirty = false
        o.next(curr.map((context) => context.value))
      }

      if (!active) {
        o.complete()
      }
    }

    function update() {
      if (!scheduled) {
        scheduled = true
        queueMicrotask(_update)
      }
    }

    active += 1
    const subscription = self.subscribe({
      next(keys) {
        if (!Array.isArray(keys)) {
          keys = EMPTY
        }
        // TODO (perf): Avoid array allocation & copy if nothing has updated.

        const prev = curr
        curr = new Array(keys.length)

        const prevLen = prev.length
        const currLen = curr.length

        if (currLen !== prevLen || prev === EMPTY) {
          dirty = true
          update()
        }

        for (let n = 0; n < currLen; ++n) {
          const key = keys[n]

          if (n < prevLen && prev[n] && equals(prev[n].key, key)) {
            curr[n] = prev[n]
            prev[n] = null
            continue
          }

          dirty = true
          update()

          // TODO (perf): Guess start index based on n, e.g. n - 1 and n + 1 to check if
          // a key has simply been added or removed.
          const i = prev.findIndex((entry) => entry && equals(entry.key, key))

          if (i !== -1) {
            curr[n] = prev[i]
            prev[i] = null
          } else {
            const entry = (curr[n] = {
              key,
              value: EMPTY,
              subscription: null,
            })

            let observable
            try {
              observable = rxjs.from(project(keys[n]))
            } catch (err) {
              observable = rxjs.throwError(() => err)
            }

            empty += 1
            active += 1

            const subscription = observable.subscribe({
              next(value) {
                if (entry.value === EMPTY) {
                  empty -= 1
                }

                entry.value = value

                dirty = true
                update()
              },
              error: _error,
            })
            // ** outer disposer runs before this assignment is done
            entry.subscription = subscription
            entry.subscription.add(() => {
              if (entry.value === EMPTY) {
                empty -= 1
              }

              active -= 1

              dirty = true
              update()
            })
          }
        }

        // TODO (perf): start from index where prev[n] is not null.
        for (let n = 0; n < prevLen; n++) {
          // ** this will crash since subscription is null
          prev[n]?.subscription.unsubscribe()
        }
      },
      error: _error,
      complete() {
        active -= 1
        if (!active) {
          update()
        }
      },
    })

    return () => {
      for (const entry of curr) {
        entry?.subscription.unsubscribe()
      }
      subscription.unsubscribe()
    }
  })
}

For some reason the disposer:

    return () => {
      for (const entry of curr) {
        entry?.subscription.unsubscribe()
      }
      subscription.unsubscribe()
    }

Runs before subscription has been assigned above:

entry.subscription = subscription

I have no idea how this can occur and have been unable to reproduce it.

Expected behavior

The outer disposer does not run concurrently with the inner subscription's next invocation.

Reproduction code

Unable to reproduce outside of production code.

Reproduction URL

No response

Version

7.8.0

Environment

No response

Additional context

No response

ronag avatar Mar 23 '23 09:03 ronag

My suspicion is that this happens after a an inner observable throws an error syncrhonously, or the project function also throws an error.

All of this code happens synchronously:

            const entry = (curr[n] = {
              key,
              value: EMPTY,
              subscription: null,
            })

            let observable
            try {
              observable = rxjs.from(project(keys[n]))
            } catch (err) {
              observable = rxjs.throwError(() => err)
            }

            empty += 1
            active += 1

            const subscription = observable.subscribe({
              next(value) {
                if (entry.value === EMPTY) {
                  empty -= 1
                }

                entry.value = value

                dirty = true
                update()
              },
              error: _error,
            })
            // ** outer disposer runs before this assignment is done
            entry.subscription = subscription

the only way the outer disposer can run before the asignment is done is if the consumer of this observable unsubscribes from it, also synchronously.

Initially I thought it was due to update() and maybe the consumer doing a take() an unsubscribing after N amounts of updates, but I see that's not possible because update() schedules a microtask.

But if the inner observable observable = rxjs.from(project(keys[n])) throws synchronously (or als the throwError( you use in the catch clause), then when subscribing to that observable you will synchronoulsy receive a call on the error subscriber, which calls _error, which calls o.error.

This o.error causes the consumer of the observable to immediately unsubscribe, which runs the cleanup function before you could assign the subscription to the entry

voliva avatar Mar 23 '23 14:03 voliva

But if the inner observable observable = rxjs.from(project(keys[n])) throws synchronously (or als the throwError( you use in the catch clause), then when subscribing to that observable you will synchronoulsy receive a call on the error subscriber, which calls _error, which calls o.error.

This o.error causes the consumer of the observable to immediately unsubscribe, which runs the cleanup function before you could assign the subscription to the entry

I suspected this as well. However, when trying to make a test case the disposer seems to run afterwards.

ronag avatar Mar 23 '23 14:03 ronag

new rxjs.Observable(o => {
  process.nextTick(() => {
    console.log('tick')
  })
  queueMicrotask(() => {
    console.log('task')
  })
  console.log('pre subscribe')
  rxjs
    .throwError(() => new Error('asd'))
    .subscribe({
      error: (err) => o.error(err)
    })
  console.log('post subscribe')
  return () => {
    console.log('disposer')
  }
}).subscribe(() => {})

Will print:

pre subscribe
post subscribe
disposer
task
tick

ronag avatar Mar 23 '23 14:03 ronag

@ronag this is also expected.

If you think about it, there's no way RxJS can call the disposer function at that point, because you haven't even returned it yet, it's all happening synchronously.

If the inner subscription happens asynchronously though (such as your original case) then you get exactly the behaviour I explained: https://stackblitz.com/edit/rxjs-kwustg?file=index.ts

new rxjs.Observable((o) => {
  queueMicrotask(() => {
    console.log('task');
  });
  console.log('pre subscribe');
  queueMicrotask(() => {
    rxjs
      .throwError(() => new Error('asd'))
      .subscribe({
        error: (err) => o.error(err),
      });
    console.log('post subscribe');
  });
  return () => {
    console.log('disposer');
  };
}).subscribe(() => {});

/* Logs
pre subscribe
task
disposer
post subscribe
*/

voliva avatar Mar 23 '23 15:03 voliva

Would it make sense to ensure e.g. throw error occurs asynchronously?

ronag avatar Mar 23 '23 17:03 ronag

In case no one else is working on this issue, is it fine if I work on it? Can this be assigned to me?

mounilKshah avatar Apr 06 '24 04:04 mounilKshah