fs2-mail icon indicating copy to clipboard operation
fs2-mail copied to clipboard

WIP: fs2 2.0.0

Open sebastianvoss opened this issue 6 years ago • 3 comments

Update to fs2 version 2.0.0

Two tests are currently failing:

  • TakeThroughDrain.early-terminated.drain
  • TakeThroughDrain.normal-termination.dont-drain

I would be thankful for hints why this happens.

sebastianvoss avatar Sep 19 '19 23:09 sebastianvoss

@AdamChlupacek Can you help me to figure out why these test are failing?

sebastianvoss avatar Oct 02 '19 10:10 sebastianvoss

why these test are failing?

To my understanding the tests fail because calling queue.dequeue completely empties the queue, so the subsequent Stream.eval(queue.dequeue1) can only evaluate to IO(None) because the queue is empty and no one enqueues in it. So the problem could be in the test itself.

You can see it running this example in the REPL (i tried with fs2 2.0.1)

import fs2._
import cats.effect.IO
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

implicit val cs = IO.contextShift(ExecutionContext.global)
implicit val ce = IO.ioConcurrentEffect

val source = Stream[IO, Int](1, 2, 3, 4, 5).covary[IO]

val program =
  fs2.concurrent.Queue
    .unbounded[IO, Int]
    .flatMap { queue =>
      (source.through(queue.enqueue).drain ++
        queue.dequeue
          .evalTap(el => IO(println(s"$el was in the queue")))
          .takeThrough(_ < 3)
          .evalTap(el => IO(println(s"$el was taken from the stream")))
          .drain ++
        Stream
          .eval(queue.dequeue1)
          .evalTap(el => IO(println(s"The queue still contains $el")))).compile.last
    }

program.unsafeRunTimed(10.second).flatten

That will output something like

1 was in the queue
1 was taken from the stream
2 was in the queue
2 was taken from the stream
3 was in the queue
3 was taken from the stream
res6: Option[Int] = None

Hope this helps!

poliez avatar Nov 03 '19 18:11 poliez

Main problem with early-terminated.drain test with fs 2.x.x is that .onFinalize in takeThroughDrain executed after last Stream.eval(queue.dequeue1). But I've found some workaround

declspec-cdecl avatar Nov 18 '19 10:11 declspec-cdecl