iteratee
iteratee copied to clipboard
Add a buffer Enumeratee
It would be nice to have a buffer Enumeratee in order to speed latency dependent iteratees up.
Would a re-chunking enumeratee be sufficient for the cases you have in mind? Right now the enumerator determines the chunks and they're not exposed in the public API after that point, but I've considered adding an Enumeratee.rechunk(newChunkSize: Int).
But re-chunking and the consuming iteratee don't run parallel or do they?
@Crafter6432 That depends on the execution model of the underlying context. Quick example with Twitter futures:
scala> import com.twitter.util.{ Future, FuturePool }
import com.twitter.util.{Future, FuturePool}
scala> import io.catbird.util._, io.iteratee._
import io.catbird.util._
import io.iteratee._
scala> val enum = Enumerator.enumList[Future, Int]((0 to 10).toList)
enum: io.iteratee.Enumerator[com.twitter.util.Future,Int] = io.iteratee.Enumerator$$anon$11@53f1cd04
scala> val f = Enumeratee.mapK[Future, Int, Int] { i =>
| FuturePool.unboundedPool {
| if (i == 3) Thread.sleep(1000)
| println(s"mapping: $i")
| i
| }
| }
f: io.iteratee.Enumeratee[com.twitter.util.Future,Int,Int] = io.iteratee.Enumeratee$$anon$13@73cb9f4b
scala> import cats.std.unit._
import cats.std.unit._
scala> val it = Iteratee.foldMap[Future, Int, Unit] { i =>
| println(s"consuming: $i")
| }
it: io.iteratee.Iteratee[com.twitter.util.Future,Int,Unit] = io.iteratee.Iteratee@4597fb54
scala> enum.mapE(f).run(it)
mapping: 0
mapping: 1
mapping: 4
mapping: 2
mapping: 6
mapping: 7
mapping: 5
mapping: 9
mapping: 10
mapping: 8
consuming: 0
consuming: 1
consuming: 2
mapping: 3
consuming: 3
consuming: 4
consuming: 5
consuming: 6
consuming: 7
consuming: 8
consuming: 9
consuming: 10
So Enumeratee.mapK does the trick ?