fs2-data icon indicating copy to clipboard operation
fs2-data copied to clipboard

Streaming over http seems to be broken in v1.11

Open thomasl8 opened this issue 1 year ago • 8 comments

Hello,

We use this library (the circe version) to stream a large JSON payload over HTTP.

It has been working great with version 1.10, however when we upgrade to version 1.11 the server seems to want to collect the entire payload before returning any data to the UI (so basically not streaming). It could be that we are using the library incorrectly for our purposes. If it makes a difference, we are using tapir as our HTTP library.

This is our code (some stuff renamed). Maybe we are doing something wrong? Any help will be much appreciated! Thanks.

          myService
            .getBigFs2Stream
            .through(ast.tokenize)
            .through(wrap.asTopLevelArray)
            .through(render.compact)
            .through(text.utf8.encode)
            .pure[IO]

thomasl8 avatar Jul 29 '24 14:07 thomasl8

Hey, thanks for reporting. This sounds like a real regression indeed. In 1.11.0, we changed the way rendering is done, and I may have introduced this regression.

Can you try to replace the rendering pipe line with

.through(fs2.data.text.render.pretty(0)(fs2.data.json.Token.compact))

And let me know if it fixes it?

satabin avatar Jul 29 '24 15:07 satabin

I am investigating the issue, and added a test (see #627). It looks like the compact pipe emits chunk really early (only singletons for now, even), even if the input is a single big chunk.

Can I see how the resulting stream is used in tapir? Can you confirm that you still get several chunks right after the .through(render.compact)?

satabin avatar Aug 03 '24 08:08 satabin

I wrote a little test program that tries to come as close as possible to the original report, but without tAPIr:

import cats.effect.*
import fs2.*
import fs2.data.json.*
import io.circe.Json

import scala.concurrent.duration.*

object EndlessMain extends IOApp.Simple {

  override def run: IO[Unit] =
    countingJson
      .meteredStartImmediately(0.001.seconds)
      .through(ast.tokenize)
      .through(wrap.asTopLevelArray)
      .through(render.compact)
      // .through(text.utf8.encode)
      .evalMap(IO.println(_))
      .compile
      .drain

  def countingJson: Stream[IO, Json] =
    Stream.iterate(0)(_ + 1).map(Json.fromInt)
}

When run, it outputs immediately, like this:

[
0
,
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8

And as the Stream is infinite by construction, it's guaranteed nothing is fully loaded into memory before output. So as @satabin pointed out, it seems worth looking into the tAPIr integration and ensure chunking is preserved.

ybasket avatar Aug 03 '24 10:08 ybasket

Hi @thomasl8, did you have a chance to look into this issue?

satabin avatar Sep 07 '24 09:09 satabin

just following along, also ran into this with 1.11.x (compact rendering is broken)

nefilim avatar Oct 27 '24 14:10 nefilim

Instead of wrapping the stream in IO, return it directly as Stream[IO, Byte]:

scala Copy Edit myService .getBigFs2Stream .through(ast.tokenize) .through(wrap.asTopLevelArray) .through(render.compact) .through(text.utf8.encode) // ✅ Return Stream[IO, Byte]

Supriya-57 avatar Mar 05 '25 09:03 Supriya-57

just following along, also ran into this with 1.11.x (compact rendering is broken)

Is it still the case with 1.11.2? I think it was fixed in this release, can you confirm?

satabin avatar Mar 05 '25 09:03 satabin

Instead of wrapping the stream in IO, return it directly as Stream[IO, Byte]:

scala Copy Edit myService .getBigFs2Stream .through(ast.tokenize) .through(wrap.asTopLevelArray) .through(render.compact) .through(text.utf8.encode) // ✅ Return Stream[IO, Byte]

Good catch I missed this last line. Would that make tapir accumulate the entire stream before sending it back? I do not use tapir, I am not sure how it behaves here.

satabin avatar Mar 05 '25 09:03 satabin