fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

`groupWithin` prematurely closes resources

Open Jasper-M opened this issue 1 year ago • 5 comments

Code:

import scala.concurrent.duration._
import cats.effect.{IO, Resource}
import fs2.Stream
import cats.effect.unsafe.implicits.global

Stream.resource(
  Resource.make(IO.println("start"))(_ => IO.println("stop"))
)
.flatMap(_ => Stream(1,2,3,4).covary[IO].metered(100.millis))
.take(2)
.evalTap(IO.println)
.groupWithin(2, 1.second)
.evalTap(IO.println)
.compile
.drain
.unsafeRunSync()

Result:

start
1
2
stop
Chunk(1, 2)

The resource is already closed before the stage after groupWithin gets to process the chunk. So this can leak a closed resource.

Jasper-M avatar Sep 20 '24 14:09 Jasper-M

Some of the fs2 combinators don't respect resources. The cause is that the foreground stream is compiled separately here.

If possible, can you structure your stream like this:

Stream
  .resource(
    Resource.make(IO.println("start"))(_ => IO.println("stop"))
  )
  .flatMap { _ =>
    Stream(1, 2, 3, 4)
      .covary[IO]
      .metered(100.millis)
      .take(2)
      .evalTap(IO.println)
      .groupWithin(2, 1.second)
      .evalTap(IO.println)
  }
  .compile
  .drain

I think a resource safe version of most combinators can be implemented via a more expressive Pull api, something like the following.

object Pull2 {
  // Option = Is there anything to lease now
  // Resource = Try to take a lease
  // Boolean = Is the resource still available; Was the Scope's resource closed between the Pull and when the resource was opened?
  def lease[F[_]]: Pull[F, Nothing, Option[Resource[F, Boolean]]] = ???
}

This would allow transferal of resource ownership across separately compiled streams.

// very contrived example which shows that ownership can cross through a channel
def leaseChunks[F[_], A](implicit
  F: Async[F]
): fs2.Pipe[F, A, (Chunk[A], F[Unit])] = stream =>
  Stream.resource(Supervisor[F]).flatMap { sup =>
    Stream
      .eval {
        concurrent.Channel.bounded[F, (Chunk[A], F[Unit])](1)
      }
      .flatMap { chan =>
        def send(c: Chunk[A]): Pull[F, Nothing, Unit] = for {
          r <- Pull2.lease[F]
          _ <- Pull.eval {
            r match {
              case None => F.unit
              case Some(r) =>
                F.uncancelable { poll =>
                  poll(r.allocated).flatMap {
                    case (false, _) => F.unit
                    case (true, release) =>
                      sup.supervise(Async[F].never[Unit].onCancel(release)).flatMap { fib =>
                        poll(chan.send((c, fib.cancel))).void
                      }
                  }
                }
            }
          }
        } yield ()

        val bg = stream.chunks.flatMap(c => send(c).stream).onFinalize(chan.close.void)

        chan.stream.concurrently(bg)
      }
  }

I have an issue open here regarding this.

ValdemarGr avatar Sep 23 '24 04:09 ValdemarGr

In this simple example that I adapted a little bit, using Pull.extendScopeTo actually seems to make a difference to keep the resource open until the result stream stops. But when I try to apply that transformation to a less trivial piece of code like groupWithin I always get the Scope lookup failure error, no matter if use F.start or stream.concurrently or where I compile which stream.

Jasper-M avatar Sep 23 '24 10:09 Jasper-M

Actually no, it does seem to work on groupWithin as well! It's just that if your input stream uses metered (or probably any other thing that uses a zip-like operation) you run into the scope lookup errors.

See, I only added metered here to my previous example, and then it blows up.

I've actually run into this issue before: https://github.com/typelevel/fs2/issues/3081#issuecomment-1387620624

Jasper-M avatar Sep 23 '24 14:09 Jasper-M

Resources and interruption is implemented via Scope which acts as a tree of resources.

concurrently compiles the supplied stream of your program: https://github.com/typelevel/fs2/blob/eea0c2542efc0d6e6587fd90f6079fc627104cee/core/shared/src/main/scala/fs2/Stream.scala#L554 which introduces a new Scope tree: https://github.com/typelevel/fs2/blob/eea0c2542efc0d6e6587fd90f6079fc627104cee/core/shared/src/main/scala/fs2/Compiler.scala#L156-L158

When the lookup function is evaluated, which traverses the scope tree, the link between the sub-stream's scopes and the previous parent scopes is broken. https://github.com/typelevel/fs2/blob/eea0c2542efc0d6e6587fd90f6079fc627104cee/core/shared/src/main/scala/fs2/internal/Scope.scala#L357

There might be an implementation of concurrently that shares the root scope, however I am unsure of the implications.

All in all, the rule is that if a stream origins as substream, it might have weak references to state in its origin stream though scopeIds, thus you may not compile that stream separately.

ValdemarGr avatar Sep 24 '24 02:09 ValdemarGr

Well, a dummy-proof and easy API for transferring resource ownership between streams would be great.

Jasper-M avatar Sep 24 '24 13:09 Jasper-M