fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Scope lookup failure when using concurrently/merge/parJoinUnbounded

Open mpilquist opened this issue 6 years ago • 0 comments

Brought up by @augustjune on Gitter today:

@ def holdLater[F[_]: Concurrent, A](stream: Stream[F, A]): Stream[F, Signal[F, A]] = {
    def uncons1(stream: Stream[F, A]): Stream[F, (A, Stream[F, A])] =
      stream.pull.uncons1.flatMap {
        case Some((a, rest)) => Pull.output1(a -> rest)
        case None            => Pull.done
      }.stream

    uncons1(stream).flatMap { case (h, t) =>
      Stream.eval(SignallingRef[F, A](h)).flatMap { sig =>
        Stream(sig).concurrently(t.evalMap(sig.set))
      }
    }
  }
defined function holdLater

@ holdLater(Stream(1, 2, 3).covary[IO].metered(1.second)).flatMap(_.discrete).take(3).compile.toList.unsafeToFuture
res19: scala.concurrent.Future[List[Int]] = Future(<not completed>)

@ res19
res20: scala.concurrent.Future[List[Int]] = Future(Failure(java.lang.Throwable: Fail to find scope for next step: current: Token(7e32dc50), step: Step(FreeC.Bind(FreeC.Bind(Step(FreeC.Bind(Eval(IO$1895120358)),None))),Some(Token(74cab720)))))

Replacing concurrently with merge or parJoinUnbounded results in the same error.

Note metered is implemented with zipRight, which uses stepLeg. Replacing metered with a simple Stream(1, 2, 3).zipRight(Stream(1, 2, 3)) shows the same problem. I suspect the issue is that uncons1 returns the tail as a stream element, which breaks scoping.

mpilquist avatar Nov 05 '19 01:11 mpilquist