rxjs icon indicating copy to clipboard operation
rxjs copied to clipboard

bufferTime minBufferSize

Open dooreelko opened this issue 8 years ago • 6 comments

Hello. I think it can be also very helpful to have a minBufferSize parameter that will prevent the buffer from emitting if there are not enough source values to emit so that the pipeline will not need to check if there's actually something to process. Currently an empty array is emitted.

To emulate current behaviour the default is 0. A value of X will mean "emit after delay only if buffer's has at least X elements, otherwise emit immediately after one is available".

What do you think?

dooreelko avatar May 11 '17 09:05 dooreelko

Running into the same issue. I have an event stream where I want to catch a timed buffer after the first emitted value rather than endless empty buffers being emitted.

Edit: I was able to do this with bufferToggle @durilka https://github.com/btroncone/learn-rxjs/blob/master/operators/transformation/buffertoggle.md The tricky part was figuring out to use throttleTime on the openings Observable

      this.source.bufferToggle(this.source.throttleTime(200), () => Observable.timer(200))
         .subscribe(buffered => {
            ...
         });

msc117 avatar May 27 '17 17:05 msc117

This just bit me in the butt. I had expected buffer to only emit when there actually was new values (Like normal observables), and not whenever the timer ticked. But a minBufferSize could definitely fix that.

zlepper avatar Aug 28 '17 17:08 zlepper

another simple fix would be to filter empty events:

.pipe(
    bufferTime(200),
    filter(buffer => buffer.length > 0),
    ...
)

But I agree that it should be supported by the library in a more efficient way

NathanBraslavski avatar Mar 12 '19 00:03 NathanBraslavski

Posting this answer since it can help other people (like me) that found this issue in their seek for the right operator. In my use case this seems to do the job well:

events$.pipe(buffer(events$.pipe(auditTime(DELAY)))

(You may want to share() your events$ observable before since it's subscribed twice here)

This will wait for events. When an event arrives, it will start buffering for the next DELAY milliseconds. When that period is over the batch of events is emitted as an array, and we start over.

You can replace auditTime with an operator that better suits your needs.

toverux avatar Jun 06 '22 21:06 toverux

another simple fix would be to filter empty events:

.pipe(
    bufferTime(200),
    filter(buffer => buffer.length > 0),
    ...
)

But I agree that it should be supported by the library in a more efficient way

Filtering empty events is simply 'hiding' the problem in the first place. I noticed CPU utilization of about 10% on a page with 4 controls using bufferTime(250), when it should have been near zero - it's clearly incorrect to continually spit out empty values, when the very documentation states 'Collects values from the past as an array, and emits those arrays periodically in time.' - if there's nothing to collect, then it shouldn't emit anything.

nick-bailey-uk avatar Aug 09 '22 08:08 nick-bailey-uk

Posting this answer since it can help other people (like me) that found this issue in their seek for the right operator. In my use case this seems to do the job well:

events$.pipe(buffer(events$.pipe(auditTime(DELAY)))

(You may want to share() your events$ observable before since it's subscribed twice here)

This will wait for events. When an event arrives, it will start buffering for the next DELAY milliseconds. When that period is over the batch of events is emitted as an array, and we start over.

You can replace auditTime with an operator that better suits your needs.

How to set the rate limit along with this like using take operator

ArunKumarBharathi avatar Nov 01 '22 15:11 ArunKumarBharathi