rxjs
rxjs copied to clipboard
bufferTime minBufferSize
Hello. I think it can be also very helpful to have a minBufferSize parameter that will prevent the buffer from emitting if there are not enough source values to emit so that the pipeline will not need to check if there's actually something to process. Currently an empty array is emitted.
To emulate current behaviour the default is 0. A value of X will mean "emit after delay only if buffer's has at least X elements, otherwise emit immediately after one is available".
What do you think?
Running into the same issue. I have an event stream where I want to catch a timed buffer after the first emitted value rather than endless empty buffers being emitted.
Edit: I was able to do this with bufferToggle
@durilka https://github.com/btroncone/learn-rxjs/blob/master/operators/transformation/buffertoggle.md
The tricky part was figuring out to use throttleTime on the openings Observable
this.source.bufferToggle(this.source.throttleTime(200), () => Observable.timer(200))
.subscribe(buffered => {
...
});
This just bit me in the butt. I had expected buffer to only emit when there actually was new values (Like normal observables), and not whenever the timer ticked. But a minBufferSize could definitely fix that.
another simple fix would be to filter empty events:
.pipe(
bufferTime(200),
filter(buffer => buffer.length > 0),
...
)
But I agree that it should be supported by the library in a more efficient way
Posting this answer since it can help other people (like me) that found this issue in their seek for the right operator. In my use case this seems to do the job well:
events$.pipe(buffer(events$.pipe(auditTime(DELAY)))
(You may want to share() your events$ observable before since it's subscribed twice here)
This will wait for events.
When an event arrives, it will start buffering for the next DELAY milliseconds.
When that period is over the batch of events is emitted as an array, and we start over.
You can replace auditTime with an operator that better suits your needs.
another simple fix would be to filter empty events:
.pipe( bufferTime(200), filter(buffer => buffer.length > 0), ... )But I agree that it should be supported by the library in a more efficient way
Filtering empty events is simply 'hiding' the problem in the first place. I noticed CPU utilization of about 10% on a page with 4 controls using bufferTime(250), when it should have been near zero - it's clearly incorrect to continually spit out empty values, when the very documentation states 'Collects values from the past as an array, and emits those arrays periodically in time.' - if there's nothing to collect, then it shouldn't emit anything.
Posting this answer since it can help other people (like me) that found this issue in their seek for the right operator. In my use case this seems to do the job well:
events$.pipe(buffer(events$.pipe(auditTime(DELAY)))(You may want to
share()yourevents$observable before since it's subscribed twice here)This will wait for events. When an event arrives, it will start buffering for the next
DELAYmilliseconds. When that period is over the batch of events is emitted as an array, and we start over.You can replace
auditTimewith an operator that better suits your needs.
How to set the rate limit along with this like using take operator