fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Add groupedWeightedWithin

Open RobertoUa opened this issue 5 years ago • 5 comments

Similar to Akka Streams https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/groupedWeightedWithin.html Zio seems to have it as well https://github.com/zio/zio/pull/1137 For Monix I've found this (not included in Monix) https://gist.github.com/rcano/7d709ff76ca2416999864a7da0a50d42

RobertoUa avatar Oct 01 '20 09:10 RobertoUa

I think this is the signature we want:

def groupWeightedWithin[F[_], A, B](
    maxWeight: Long,
    maxTime: FiniteDuration
)(
    elemWeight: A => Long
): Pipe[F, A, Chunk[B]]

And it would have a semantic like groupWithin(max, dur) = groupWeightedWithin(max, dur)(_ => 1) (that's roughly how akka-streams defines it, for example)

Daenyth avatar Oct 01 '20 17:10 Daenyth

I'm working on a general solution to this problem. Can't promise it's going to work yet, but let's see where it goes.

SystemFw avatar Oct 02 '20 09:10 SystemFw

Also this probably will be best done after @SystemFw's rework of groupWithin, to minimize wasted/duplicated effort

Daenyth avatar Oct 02 '20 15:10 Daenyth

Are there any plans to progress this in near future?

rehanone avatar Aug 06 '21 01:08 rehanone

I've implemented groupWeighedWithin based on Timed Pull https://gist.github.com/RobertoUa/189d6ccd3aa82021ed9f22af145c1bca

One question though. Is it possible to push buffered items downstream on stream interruption/cats-effect cancelation? And make that downstream uncancellable?

RobertoUa avatar Mar 14 '22 10:03 RobertoUa