fs2
fs2 copied to clipboard
Accumulated `cats.data.WriterT` data not preserved from function passed to `fs2.io.readOutputStream`
When using fs2.io.readOutputStream
in combination with an effect type that uses cats.data.WriterT
, any data accumulated in the function passed to fs2.io.readOutputStream
is not preserved. For example, I would expect the list of strings produced by this code to include both "string 1"
and "string 2"
, but I only get "string 2"
:
import cats.data.WriterT
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.io.readOutputStream
readOutputStream[WriterT[IO, List[String], *]](1024)(o => WriterT(IO((List("string 1"), o.write("a".getBytes("UTF-8"))))))
.evalMap(x => WriterT(IO.pure((List("string 2"), x))))
.compile
.drain
.run
.unsafeRunSync()
// (List("string 2"), ())
I can reproduce the issue using this code (available in this repo: https://github.com/mrdziuban/fs2-readOutputStream-WriterT) on both v2.5.10 and v3.2.4.
Further investigation leads me to believe this is an issue with Stream#concurrently
. Here's another minimal reproduction that doesn't use readOutputStream
:
import cats.data.WriterT
import cats.effect.IO
import cats.effect.unsafe.implicits.global
println(Stream.eval(WriterT.tell[IO, List[String]](List("string 1")))
.concurrently(Stream.eval(WriterT.tell[IO, List[String]](List("string 2"))))
.evalMap(_ => WriterT.tell[IO, List[String]](List("string 3")))
.compile
.drain
.run
.unsafeRunSync())
// (List(string 1, string 3),())
IIRC this is due to an issue with the Concurrent instance for WriterT. /cc @djspiewak
Does concurrently
use forceR
(!>
) somewhere internally? If it does, that operator is known for not continuing Writer
log chains and State
state changes.