kotlinx.serialization icon indicating copy to clipboard operation
kotlinx.serialization copied to clipboard

`Json.decodeBufferedSourceToSequence` doesn't work with infinite streams

Open maniac103 opened this issue 1 year ago • 1 comments

Describe the bug I'm trying to consume events from a streaming Cometd server. Those events essentially are JSON messages transmitted over a chunked HTTP connection: whenever the server has a new event available, it transmits the JSON encoded event in a new chunk in the HTTP connection, which is kept open for the next event. For receiving the events, I'm using OkHttp and am trying to consume the response body. However, this doesn't work because OkioSerialReader tries to consume the full source before JSON decoding is even started. It was my impression from the use cases given in #1662 (e.g. this one or that one) that decoding such 'infinite' streams should be supported - am I missing something there?

To Reproduce Issue can be reproduced without HTTP connection using an Okio.Pipe:

@Serializable
data class TestMessage(val id: Int)

GlobalScope.launch {
    val pipe = Pipe(4096)
    launch {
        val msg = "{\"id\":1}"
        val sink = pipe.sink.buffer()
        val start = Clock.System.now()
        sink.writeUtf8(msg)
        System.out.println("Wrote message after ${Clock.System.now() - start}")
        delay(10000)
        sink.writeUtf8(msg)
        System.out.println("Wrote message after ${Clock.System.now() - start}")
        delay(10000)
        sink.writeUtf8(msg)
        System.out.println("Wrote message after ${Clock.System.now() - start}")
        delay(10000)
        sink.close()
        System.out.println("Closed after ${Clock.System.now() - start}")
    }

    launch {
        val source = pipe.source.buffer()
        val start = Clock.System.now()
        Json.decodeBufferedSourceToSequence<TestMessage>(source).forEach { m ->
            System.out.println("got $m after ${Clock.System.now() - start}")
        }
    }
}

Seen output:

Wrote message after 44us
Wrote message after 10.009552s
Wrote message after 20.012236s
Closed after 30.013578s
got TestMessage(id=1) after 30.011291s
got TestMessage(id=1) after 30.011593s
got TestMessage(id=1) after 30.012009s

Expected output:

Wrote message after 44us
got TestMessage(id=1) after 0.011291s
Wrote message after 10.009552s
got TestMessage(id=1) after 10.011593s
Wrote message after 20.012236s
got TestMessage(id=1) after 20.022009s
Closed after 30.013578s

Expected behavior Being able to consume JSON messages from a permanently open HTTP connection

Environment

  • Kotlin version: 1.9.22
  • Library version: 1.6.2
  • Kotlin platforms: JVM (Android)
  • Gradle version:8.2

maniac103 avatar Jan 08 '24 08:01 maniac103

Yes, you're right, it should be supported. I think it is a bug.

sandwwraith avatar Jan 22 '24 11:01 sandwwraith