rxjs icon indicating copy to clipboard operation
rxjs copied to clipboard

New `timeout` operator no longer supports configuring a timeout regarding source completion

Open danielwiehl opened this issue 3 years ago • 4 comments

Describe the bug

The timeout operator in version 6.x behaved differently if the passed argument was a duration or a date. Passing a date allowed to time out the subscription if the source did not complete by that date, regardless of source emissions, if any.

This feature was documented as follows: If provided argument was Date, returned Observable behaves differently. It throws if Observable did not complete before provided Date. This means that periods between emission of particular values do not matter in this case. If Observable did not complete before provided Date, source Observable will be unsubscribed. Other than that, resulting stream behaves just as source Observable.

Version 7.x does not seem to support this feature anymore. I also cannot find any notice in the migration guide that this feature has been removed. Did you remove this feature on purpose? I mean, this was a pretty handy feature that can't be easily re-implemented by piping operators.

Expected behavior

Similar to first or each, the TimeoutConfig should provide a way to configure a date or duration by which the source must complete before throwing a timeout error.

Reproduction code

No response

Reproduction URL

No response

Version

7.5.4

Environment

No response

Additional context

No response

danielwiehl avatar Mar 07 '22 19:03 danielwiehl

I mean, this was a pretty handy feature that can't be easily re-implemented by piping operators.

The following should do the trick:

source.pipe(
  takeUntil( // takeUntil has no real effect since we throw on timeout, but it cancels the timeout when source completes
    timer(due, scheduler)
      .pipe(
        switchMap(() => throwError(() => new TimeoutError())), // throw any error you want
      ),
  ),
);

The only thing to be aware of is that it will throw an error if timer emits synchronously (depends on the provided scheduler), even if source would complete synchronously. To fix that, you could pipe the timer through subscribeOn(asyncScheduler)

ajafff avatar Mar 08 '22 20:03 ajafff

Clever, you use the takeUntil operator, which immediately subscribes and mirrors the source. You have to come up with that idea first, great! But do you think the takeUntil operator is designed for that? I mean, it seems to work, but the API doesn't explicitly state what should happen when the notifier errors.

I still think it would be great to have that functionality back in the API of the timeout operator. Don't you think so?

Finally, one last question regarding your comment about synchronous schedulers. Could you please explain what you mean by a synchronous scheduler? My understanding is that a scheduler is asynchronous per se, to control when notifications should be delivered from the source. For example, the asyncScheduler creates a macrotask, the asapScheduler creates a microtask, etc.. Could you please give me an example of a synchronous scheduler and when to use it?

danielwiehl avatar Mar 09 '22 06:03 danielwiehl

I mean, it seems to work, but the API doesn't explicitly state what should happen when the notifier errors.

Errors are always forwarded to the target Observable. So if anything in the chain throws or emits an error notification (one of the source Observables, a callback or a notifier), the resulting Observable will error. Error propagation only stops at operators that are made for error handling, e.g. catchError or onErrorResumeNext. This applies to all builtin operators. Maybe the docs could mention this?

I still think it would be great to have that functionality back in the API of the timeout operator. Don't you think so?

I don't have a strong opinion on this. Probably because I don't have a use-case for it. I just wanted to share a workaround in case you need one for migration.

Could you please explain what you mean by a synchronous scheduler?

Users can write their own schedulers. AFAICT there is nothing to prevent actions from being executed synchronously.

class SyncSchedulerAction<T> extends Subscription implements SchedulerAction<T>{
    constructor(private work: (this: SchedulerAction<T>, state?: T) => void) {
        super();
    }

    schedule(state?: T, _delay?: number): Subscription {
        this.work(state);
        return this;
    }
}

export const syncScheduler: SchedulerLike = {
    now: () => 0,
    schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, _delay?: number, state?: T): Subscription {
        return new SyncSchedulerAction(work).schedule(state);
    }
};

One use case that comes to mind is a variant of queueScheduler that executes the next task synchronously after the previous task finished. That might be useful to change subscription order for operators like takeUntil. But that's rather tough to explain without a lot of hard-to-read code.

ajafff avatar Mar 09 '22 08:03 ajafff

Wow, great detailed explanation, thank you. It's awesome that RxJS has such a supportive community.

danielwiehl avatar Mar 09 '22 09:03 danielwiehl