Streaming over http seems to be broken in v1.11
Hello,
We use this library (the circe version) to stream a large JSON payload over HTTP.
It has been working great with version 1.10, however when we upgrade to version 1.11 the server seems to want to collect the entire payload before returning any data to the UI (so basically not streaming). It could be that we are using the library incorrectly for our purposes. If it makes a difference, we are using tapir as our HTTP library.
This is our code (some stuff renamed). Maybe we are doing something wrong? Any help will be much appreciated! Thanks.
myService
.getBigFs2Stream
.through(ast.tokenize)
.through(wrap.asTopLevelArray)
.through(render.compact)
.through(text.utf8.encode)
.pure[IO]
Hey, thanks for reporting. This sounds like a real regression indeed. In 1.11.0, we changed the way rendering is done, and I may have introduced this regression.
Can you try to replace the rendering pipe line with
.through(fs2.data.text.render.pretty(0)(fs2.data.json.Token.compact))
And let me know if it fixes it?
I am investigating the issue, and added a test (see #627). It looks like the compact pipe emits chunk really early (only singletons for now, even), even if the input is a single big chunk.
Can I see how the resulting stream is used in tapir? Can you confirm that you still get several chunks right after the .through(render.compact)?
I wrote a little test program that tries to come as close as possible to the original report, but without tAPIr:
import cats.effect.*
import fs2.*
import fs2.data.json.*
import io.circe.Json
import scala.concurrent.duration.*
object EndlessMain extends IOApp.Simple {
override def run: IO[Unit] =
countingJson
.meteredStartImmediately(0.001.seconds)
.through(ast.tokenize)
.through(wrap.asTopLevelArray)
.through(render.compact)
// .through(text.utf8.encode)
.evalMap(IO.println(_))
.compile
.drain
def countingJson: Stream[IO, Json] =
Stream.iterate(0)(_ + 1).map(Json.fromInt)
}
When run, it outputs immediately, like this:
[
0
,
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
And as the Stream is infinite by construction, it's guaranteed nothing is fully loaded into memory before output. So as @satabin pointed out, it seems worth looking into the tAPIr integration and ensure chunking is preserved.
Hi @thomasl8, did you have a chance to look into this issue?
just following along, also ran into this with 1.11.x (compact rendering is broken)
Instead of wrapping the stream in IO, return it directly as Stream[IO, Byte]:
scala Copy Edit myService .getBigFs2Stream .through(ast.tokenize) .through(wrap.asTopLevelArray) .through(render.compact) .through(text.utf8.encode) // ✅ Return Stream[IO, Byte]
just following along, also ran into this with
1.11.x(compact rendering is broken)
Is it still the case with 1.11.2? I think it was fixed in this release, can you confirm?
Instead of wrapping the stream in IO, return it directly as Stream[IO, Byte]:
scala Copy Edit myService .getBigFs2Stream .through(ast.tokenize) .through(wrap.asTopLevelArray) .through(render.compact) .through(text.utf8.encode) // ✅ Return Stream[IO, Byte]
Good catch I missed this last line. Would that make tapir accumulate the entire stream before sending it back? I do not use tapir, I am not sure how it behaves here.