fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

parEvalMap deadlocks when Stream's `F` has extra error channels

Open Daenyth opened this issue 2 years ago • 3 comments

Tested and reproduced on version 3.2.7 and 3.6.1

This happens because in parEvalMapUnordered, the results of the evalMap action are passed back to the controlling stream using a callback in F. But when F has a failure mode other than raiseError, the Either[Throwable, Result] => ... callback can't be invoked - leading to deadlock. The fix for this probably involves using Outcome instead of the Either-based callback.

  test("Stream of IorT deadlocks on Ior.Left") {
    val s = fs2
      .Stream(1)
      .covary[IorT[IO, String, *]]
      .parEvalMapUnordered(Int.MaxValue)(_ =>
        IorT(IO.pure("fail".leftIor[Int]))
      )
      .compile
      .drain
      .value

    TestControl.executeEmbed(s).assertEquals(Ior.left("fail"))
  }

This fails the same way when using EitherT instead of IorT

Thanks to @armanbilge and @ChristopherDavenport for your help in debugging!

Daenyth avatar Apr 04 '23 19:04 Daenyth

The fix for this probably involves using Outcome instead of the Either-based callback.

Interesting, not sure I understand how that would work :)


So to me this feels like a variant of https://github.com/typelevel/cats/issues/4308. Specifically, see my comment https://github.com/typelevel/cats/issues/4308#issuecomment-1257864343.

parEvalMapUnordered expects a lawful F[_]: Concurrent, and its behavior is only specified with respect to Concurrent operations/laws.

IorT[IO, String, *] and EitherT[IO, String, *] do not have a lawful Concurrent in terms of the String error channel. So it is picking up an alternative implementation in terms of Throwable, but this implementation does not respect errors raised on the Ior/Either.

Ideally we should be using a GenConcurrent[IorT[IO, String, *], String] instance for parEvalMapUnordered in this case. However, as described in https://github.com/typelevel/cats/issues/4308#issuecomment-1257864343 such a thing cannot lawfully exist.

armanbilge avatar Apr 04 '23 20:04 armanbilge

This also happens on 3.7.0 and CE 3.5.0

Seems to be triggered with translate and doobie ConnectionIO

hamnis avatar May 15 '23 11:05 hamnis

That's unrelated, and due to the way WeakAsync uses fromFuture - this happens when F has an error channel that can't be translated to IO.raiseError. ConnectionIO is safe from this issue

Daenyth avatar May 16 '23 14:05 Daenyth