fs2
fs2 copied to clipboard
Scope lookup failure when using concurrently/merge/parJoinUnbounded
Brought up by @augustjune on Gitter today:
@ def holdLater[F[_]: Concurrent, A](stream: Stream[F, A]): Stream[F, Signal[F, A]] = {
def uncons1(stream: Stream[F, A]): Stream[F, (A, Stream[F, A])] =
stream.pull.uncons1.flatMap {
case Some((a, rest)) => Pull.output1(a -> rest)
case None => Pull.done
}.stream
uncons1(stream).flatMap { case (h, t) =>
Stream.eval(SignallingRef[F, A](h)).flatMap { sig =>
Stream(sig).concurrently(t.evalMap(sig.set))
}
}
}
defined function holdLater
@ holdLater(Stream(1, 2, 3).covary[IO].metered(1.second)).flatMap(_.discrete).take(3).compile.toList.unsafeToFuture
res19: scala.concurrent.Future[List[Int]] = Future(<not completed>)
@ res19
res20: scala.concurrent.Future[List[Int]] = Future(Failure(java.lang.Throwable: Fail to find scope for next step: current: Token(7e32dc50), step: Step(FreeC.Bind(FreeC.Bind(Step(FreeC.Bind(Eval(IO$1895120358)),None))),Some(Token(74cab720)))))
Replacing concurrently with merge or parJoinUnbounded results in the same error.
Note metered is implemented with zipRight, which uses stepLeg. Replacing metered with a simple Stream(1, 2, 3).zipRight(Stream(1, 2, 3)) shows the same problem. I suspect the issue is that uncons1 returns the tail as a stream element, which breaks scoping.