zio-schema
zio-schema copied to clipboard
JsonCodec and ProtobufCodec decoders should be resilient to different chunk sizes
The ZTransducer
for decoding fails when the input chunk doesn't exactly match one encoded object. Almost all rechunked streams will therefore fail.
Transducers should be resilient to different chunk sizes (handle partial and excess input bytes).
// from ProtobufCodecSpec -> Should successfully encode and decode -> floats
val value = 0.001f
encodeAndDecode(Schema[Float], value) //<-- this effect will fail with "Invalid number of bytes for Float. Expected 4, got 2"
def encodeAndDecode[A](schema: Schema[A], input: A): ZIO[Any, String, Chunk[A]] =
ZStream
.succeed(input)
.transduce(ProtobufCodec.encoder(schema))
.chunkN(2) // <-- test are fine without rechunking
.transduce(ProtobufCodec.decoder(schema))
.run(ZSink.collectAll)
I started looking into this issue but got stuck with the implementation. I see that zio-json already has a nice implementation for a decoding transducer. However, it only works on the JVM, because it uses a java.io.Reader
implementation that blocks until new chunks arrive on the stream. I don't know yet how it could be done on Scala.Js without changing something in zio-json to fit this problem.
For protobuf, we could change the implementation from Chunk[Byte] => (String, (Chunk[Byte], A)
to something like ProtobufReader => IO[String, A]
whith ProtobufReader
having a method readBytes(count: Int): IO[E, Chunk[Byte]]
. Then we could provide a transducer implementation similar to zio-json but one that works with Scala.js as well. WDYT? Would you consider it an overkill to introduce ZIO effects to the decoder implementation just to solve this problem?
Tokenizing the stream based on json syntax and then providing the buffered chunks to the decoder could also work, I can try that.