rxjs icon indicating copy to clipboard operation
rxjs copied to clipboard

exhaustMapWithTrailing/debounceMap implementation

Open aaronjensen opened this issue 6 years ago • 13 comments

I'm just opening (and closing) this to add an option to #1777, which I found and spent some time trying to get the examples working but ultimately found them lacking. I'm sorry if this isn't welcome (perhaps adding a stackoverflow Q&A would be better?)

This appears to work for me, hopefully it will be helpful to others:

// Sometimes we want the throttling behavior of exhaustMap without the losing
// the last emission. This is helpful in situations where we want to only issue
// one request at a time, but if we are told to issue another while issuing the
// first, we respect that. Think of it as a nicer switchMap.
//
// input   -1--2--3--4-------->
// project -1xxxxxxx3xxxxxxx4-> (x is async/waiting)
//
// Originally started with this, but did not work:
// https://github.com/ReactiveX/rxjs/issues/1777#issuecomment-466267803
export function exhaustMapWithTrailing<T, R>(
  project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R> {
  return (source): Observable<R> => {
    const release = new Subject()

    return source.pipe(
      throttle(() => release, {
        leading: true,
        trailing: true,
      }),
      // TODO: Upgrade to TypeScript 3.6.3 when available and remove the cast
      // https://github.com/microsoft/TypeScript/issues/33131
      exhaustMap((value, index) =>
        from(project(value, index)).pipe(
          finalize(() => {
            release.next()
          })
        )
      ) as OperatorFunction<T, R>
    )
  }
}

/cc @edusperoni @Dorus

aaronjensen avatar Sep 08 '19 17:09 aaronjensen

Actually, @cartant (or another collaborator) if you could add this example to the thread in #1777 (as long as there's nothing horribly wrong with my implementation) and delete this issue, that would be amazing.

aaronjensen avatar Sep 08 '19 17:09 aaronjensen

Thank you for this @aaronjensen! I needed exactly this operator and took the liberty of writing a test suite for it and publish to npm. Here it is: https://www.npmjs.com/package/rxjs-exhaustmap-with-trailing

bjoerge avatar Mar 24 '21 10:03 bjoerge

Looking at the npm package, i did notice this operator is not fully adhering the following property form #1777

It would be nice to have an operator that plays the latest event, like exhaustMap, but also guarantees that the last event from a stream is processed.

But this test: afbeelding fails to guarantee the last even from the source is processed.

Dorus avatar Mar 27 '21 09:03 Dorus

@Dorus yeah, this is due to how throttle currently won't emit the trailing value after the source has completed. Looks like this behavior will change in rxjs@7 (see this commit: d5fd69c123d2232335563eea95c69c07576d079d).

I'm happy to review a PR and update the test if you have any ideas about how to fix it for rxjs@6

bjoerge avatar Mar 30 '21 18:03 bjoerge

@Dorus: I just released [email protected] which bumps the peer dependency of RxJS to 7.0 and updates the test to reflect the changed behavior of throttle()

bjoerge avatar May 01 '21 10:05 bjoerge

I wonder what's the point of using exhaustMap rather than simply mergeMap? I think the cancelling/debouncing effect is already coming from throttle. What am I missing?

striderhobbit avatar Aug 23 '23 19:08 striderhobbit

@striderhobbit This is a 4 year old issue and I've not touched rxjs in years, so I personally couldn't tell you. I gave an example and rationale in my original issue. Keep in mind that it's very possible things have changed in this library since I opened the issue.

aaronjensen avatar Aug 23 '23 20:08 aaronjensen

@striderhobbit the point is alleviating back pressure in a way that considers the target observable timing, not some fixed timing, or some other observable completely. For example:

someAction$.pipe(exhaustMap((action) => networkRequest(action.param)))

so let's say you're typing some data. You type "hello", the server filters and returns to you. You can use a debounce/throttle with a fixed time, but if your request takes 5s to execute, you might end up just never filtering anything, just piling up requests on the server. On the other hand, with exhaustMap you can ignore inputs until your request finishes.

exhaustMapWithTrailing is the step up, where it also fires the last one. So if you type "hello", it starts filtering for "hello", and then you type "w" "o" "r" "l" "d" while the hello request still hasn't finished, it'll return the filtered results for "hello" and then start the request for "hello world". Essentially it reduces the amount of times the target observable will fire and ensures the last value it emits is always in sync with the source observable.

I don't remember the last time I needed this, but it was a while ago and I'm not sure if any other new operator implementation made this implementation redundant or not.

edusperoni avatar Aug 23 '23 20:08 edusperoni

@edusperoni @aaronjensen I understand all that. It's not about how the operator works, that's fine and in fact exactly what I need. My point is only that I think you could define the operator using mergeMap in favor of exhaustMap with the same result. Why?

throttle emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled

So as long as observable inside mergeMap/exhaustMap isn't completed and thus has called release.next() all incoming source values will be ignored/choked by throttle already. It's really some clever piece of code I must say.

Check out this for an example. It will produce the same output for either mergeMap or exhaustMap.

As I said, I don't know if that's exhaustive or if I'm missing something, that's why I wanted to put it up for discussion.

striderhobbit avatar Aug 23 '23 20:08 striderhobbit

I don't think that's how throttle works. Throttle only cares about its own timer, not its downstream, so I'm not sure how "all incoming source values will be ignored/choked by throttle already" could be true. But again, I could be totally out to lunch here, I haven't had to touch rxjs in a very long time, so I'm out of practice.

aaronjensen avatar Aug 23 '23 20:08 aaronjensen

@striderhobbit I believe I understand the question now. Are you asking why inside the operator itself we're using exhaustMap and not mergeMap?

I guess the answer could be "safety", but sure, mergeMap would probably work.

edusperoni avatar Aug 23 '23 20:08 edusperoni

I'm likely mistaken about throttle, I was thinking of a regular debounce. I think I get it now, and I don't have an answer. If it works for you, great. I couldn't tell you a meaningful difference.

aaronjensen avatar Aug 23 '23 20:08 aaronjensen

@edusperoni Yes that was my question. Just wanted to make I was not missing something.

striderhobbit avatar Aug 24 '23 09:08 striderhobbit