zio-schema
zio-schema copied to clipboard
JSonCoder fail with big payload while streaming.
there is a different behaviour when I transduce through a stream or a buffer:
implicit val decoder: ZTransducer[Any, String, Byte, JsonRpc[Block]] = JsonCodec.decoder(schemaJrpc)
This fails with: "Unexpected end of input" 👇 ZStream .fromInputStream(getClass().getResourceAsStream("/block.json")) .transduce(Eth.decoder) .runHead
If the same payload if decoded from a buffer (ArrayByte) it works smoothly.
Notice that the payload if quite big, in particular there is a long array (more than 100 UUID), which is responsible of the failure (when I shorten this array error goes away).
A minimal project illustrating the issu:
https://github.com/cheleb/zio-schema-issue-168
Thanks for reporting @cheleb. The issue in this case is that ZStream.fromInputStream uses a default chunk size on 4096 bytes, so if the file is larger than that (as in your reproduce example) it will read less than the entire payload in each chunk which can't be decoded because it is not a valid JSON document. That's why it works fine for smaller documents.
I'm not sure there is a way to handle the case where the input stream has chunk boundaries which split encoded documents. But you can workaround it for the moment by setting the chunk size on the input stream to something larger than the default of 4096 bytes. This change to your example fixes the failing test:
testM("sayHello correctly displays output") {
for {
jrpcOption <- ZStream
.fromInputStream(getClass().getResourceAsStream("/block.json"), chunkSize = 4096 * 3)
.transduce(decoder)
.runHead
jrpc <- ZIO.fromOption(jrpcOption)
} yield assert(jrpc.jsonrpc)(equalTo("1.0"))
},
There is an issue to address this: #182
I had this issue when reading json from http stream servers from the browser, specially when you receive json arrays with multiple entries, you always receive unfinished json between chunks
What I did is to have a copy of the current chunk, try to parse as many jsons as possible (in my case it was a result array), whenever I got an error I just copied and moved to next chunk.
Then when reading next chunk I concat the last part of the previous chunk
To do this in a functional way we need something like State monad I suppose