stream_transform icon indicating copy to clipboard operation
stream_transform copied to clipboard

[FR] exhaustMap

Open purplenoodlesoop opened this issue 3 years ago • 9 comments

Hello folks.

Seeing that this package could potentially be merged into package:async, got me thinking about what features it lacks for my personal usage to completely replace the RxDart package.

The only one is basically the exhaustMap transformer. From the most used higher-order transformers triad asyncExpand, switchMap and exhaustMap, the last one is lacking, so it would be great to have it natively.

Thanks!

purplenoodlesoop avatar Apr 10 '22 17:04 purplenoodlesoop

From https://rxjs.dev/api/operators/exhaustMap it looks like exhaustMap matches the semantics of asyncExpand.

Can you clarify the difference you expect from asyncExpand?

natebosch avatar May 03 '22 22:05 natebosch

Hi @natebosch!

Thanks for considering my issue. Sure!

Yes, semantically asyncExpand and exhaustMap both represent higher-order stream mappers, but they have a key difference – while asyncExpand would sequentially flatten the streams, exhaustMap would drop all events from incoming streams if one is being processed, "flattened", at the moment.

In terms of RxJS, asyncExpand is concatMap and exhaustMap is, well, exhaustMap.

RxDart also has exhaustMap, and its description reads as follows:

Converts items from the source stream into a Stream using a given mapper. It ignores all items from the source stream until the new stream completes.

purplenoodlesoop avatar May 18 '22 18:05 purplenoodlesoop

Ah interesting so exhaustMap -> asyncExpand the way that asyncMapSample -> asyncMap. Thanks for clarifying, I had misread the RxJS docs when I looked at this.

I think the feature makes sense. I'm tempted to name it asyncExpandSample, parity with names from Rx was never a priority, and it may be easier to understand with that name. At the same time, now that I understand the behavior exhaustMap does make sense to me. @jakemac53 @munificent - what is your opinion on the naming?

natebosch avatar May 18 '22 18:05 natebosch

asyncExpandSample makes more sense to me

jakemac53 avatar May 18 '22 18:05 jakemac53

Agree on the naming, asyncExpandSample sounds right and consistent with asyncMapSample. Thanks again, thrilled to see this feature as part of the stream_transform!

purplenoodlesoop avatar May 18 '22 19:05 purplenoodlesoop

I discovered there is an edge of the behavior that differs from asyncMapSample.

In asyncMapSample we only drop events if more than one comes in during the waiting period. If only one event comes in while work is ongoing, it will be held and delivered after the work.

With the exhaustMap reference all events that come in during the waiting period are dropped.

|--A--B--C--------| # Events
|--AAAAABBBBBCCCCC| #Ongoing work

vs

|--A--B--C----| #Events
|--AAAAA-CCCCC| #Ongoing Work

(I even found an implementation called exhaustMap which is asyncMapSample with the behavior of dropping all events during ongoing work, so that's clearly a useful behavior too)

So maybe we should go with exhastMap, or think of some other name were we could have a similar implementation for asyncMap. We could easily deliver asyncMapSample with the same "sampling" semantics as well, but it might be better to hold off for a use case.

@purplenoodlesoop - to confirm - are you interested in the behavior were all events during an ongoing stream are ignored, or where we drop all but the most recent?

natebosch avatar May 19 '22 17:05 natebosch

Hmm, interesting, there is indeed an edge. @natebosch I'm interested in dropping all events during ongoing work, so exhaustMap's behavior it is. To be honest, I'm not really sure where asyncMapSample-like behavior would benefit; I don't see a common use case for it.

purplenoodlesoop avatar May 20 '22 11:05 purplenoodlesoop

To be honest, I'm not really sure where asyncMapSample-like behavior would benefit; I don't see a common use case for it.

It still limits how often you would actually process events, but ensures that if you had any events while processing the last one you will still process the most recent one. So lets say for change detection or something, you were still processing the last change detection cycle and don't want to interrupt that with new events, but you do need to schedule the next round of change detection to run immediately after processing the current round, if events came in during it.

jakemac53 avatar May 20 '22 14:05 jakemac53

I hope I'm not confusing myself here...

Another analogy to consider is our throttle method.

Compared to ReactiveX, our throttle is like throttleFirst except we also support the trailing argument like throttleTime from RxJs.

Within stream_transform throttle(trailing: true) has a similar behavior to asyncMapSample. Throttle takes a single Duration, while sample uses the duration of the Future returned by the callback. If we add an option for asyncMapSample(trailing: false) and also in our supposed asyncExpandSample, where trailing has the same meaning as it does in throttle, then I think exhaustMap would correspond to asyncExpandSample(trailing: false).

One slight downside is that the default value of the leading: argument would be different between "sample" and "throttle" variants.

One potential upside is that the generality, and especially using the generality as the way to frame discussion of behavior comparisons and contrasts between the methods, could still make this all easier to understand than having more unique names like exhuast vs *Sample.

We'd be adding at least one use case pre-emptively (no one has asked for asyncExpandSample(trailing: true) as far as I know) but we'd also be satisfying another use case for which at least one internal team has their own implementation (asyncMapSample(trailing: false)).

natebosch avatar May 20 '22 18:05 natebosch