WIP: fs2 2.0.0
Update to fs2 version 2.0.0
Two tests are currently failing:
- TakeThroughDrain.early-terminated.drain
- TakeThroughDrain.normal-termination.dont-drain
I would be thankful for hints why this happens.
@AdamChlupacek Can you help me to figure out why these test are failing?
why these test are failing?
To my understanding the tests fail because calling queue.dequeue completely empties the queue, so the subsequent Stream.eval(queue.dequeue1) can only evaluate to IO(None) because the queue is empty and no one enqueues in it. So the problem could be in the test itself.
You can see it running this example in the REPL (i tried with fs2 2.0.1)
import fs2._
import cats.effect.IO
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
implicit val cs = IO.contextShift(ExecutionContext.global)
implicit val ce = IO.ioConcurrentEffect
val source = Stream[IO, Int](1, 2, 3, 4, 5).covary[IO]
val program =
fs2.concurrent.Queue
.unbounded[IO, Int]
.flatMap { queue =>
(source.through(queue.enqueue).drain ++
queue.dequeue
.evalTap(el => IO(println(s"$el was in the queue")))
.takeThrough(_ < 3)
.evalTap(el => IO(println(s"$el was taken from the stream")))
.drain ++
Stream
.eval(queue.dequeue1)
.evalTap(el => IO(println(s"The queue still contains $el")))).compile.last
}
program.unsafeRunTimed(10.second).flatten
That will output something like
1 was in the queue
1 was taken from the stream
2 was in the queue
2 was taken from the stream
3 was in the queue
3 was taken from the stream
res6: Option[Int] = None
Hope this helps!
Main problem with early-terminated.drain test with fs 2.x.x is that .onFinalize in takeThroughDrain executed after last Stream.eval(queue.dequeue1). But I've found some workaround