fs2
fs2 copied to clipboard
Add groupedWeightedWithin
Similar to Akka Streams https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/groupedWeightedWithin.html Zio seems to have it as well https://github.com/zio/zio/pull/1137 For Monix I've found this (not included in Monix) https://gist.github.com/rcano/7d709ff76ca2416999864a7da0a50d42
I think this is the signature we want:
def groupWeightedWithin[F[_], A, B](
maxWeight: Long,
maxTime: FiniteDuration
)(
elemWeight: A => Long
): Pipe[F, A, Chunk[B]]
And it would have a semantic like groupWithin(max, dur) = groupWeightedWithin(max, dur)(_ => 1) (that's roughly how akka-streams defines it, for example)
I'm working on a general solution to this problem. Can't promise it's going to work yet, but let's see where it goes.
Also this probably will be best done after @SystemFw's rework of groupWithin, to minimize wasted/duplicated effort
Are there any plans to progress this in near future?
I've implemented groupWeighedWithin based on Timed Pull
https://gist.github.com/RobertoUa/189d6ccd3aa82021ed9f22af145c1bca
One question though. Is it possible to push buffered items downstream on stream interruption/cats-effect cancelation? And make that downstream uncancellable?