Python client receives out-of-order stream
Hi all, I have a server implemented in Scala using zio-grpc, and a client in Python.
I found that sometimes the stream received by the client is out of order. For example, the sequence of data sent by the server is [1, 2, 3, 4, 5], but those received by the client are [1, 2, 4, 3, 5]. Though I'm new to grpc, I don't think this is an expected behavior.
Here is a minimal example: https://drive.google.com/file/d/1Eew2sOhjSt2tCBEupE1glo6PALYkB0t1/view
import io.grpc.StatusException
import scalapb.zio_grpc.ServerMain
import scalapb.zio_grpc.ServiceList
import zio.{ZIO, stream}
import zio.stream.ZStream
import example.demo._
class DemoService extends ZioDemo.Demo {
def foo(request: FooRequest): stream.Stream[StatusException, FooResponse] = {
ZStream.fromIterable(getStream)
}
private def getStream: List[FooResponse] = List.range(0, 100).map(i => FooResponse(i.toString))
}
object Main extends ServerMain {
override def port: Int = 8981
override def services: ServiceList[Any] =
ServiceList.add(new DemoService)
}
import grpc
import demo_pb2
import demo_pb2_grpc
def get_streaming_response(stub):
responses = list(stub.Foo(demo_pb2.FooRequest(data="hello")))
if any(r.data != str(i) for i, r in enumerate(responses)):
print("Expected:", list(range(len(responses))))
print("Got: ", [int(r.data) for r in responses])
print("Diff: ", [i for i, r in enumerate(responses) if r.data != str(i)])
raise ValueError("Out of order response")
if __name__ == "__main__":
with grpc.insecure_channel("localhost:8981") as channel:
demo_stub = demo_pb2_grpc.DemoStub(channel)
n = 10000
for _ in range(n):
get_streaming_response(demo_stub)
#### output looks like:
# Expected: [0, 1, 2, ...]
# Got: [0, 1, 2, ...]
# Diff: [x, y, ...]
According to my testing results, this happens rarely (with n=10000 this will almost always happen), and usually only one or two pairs of adjacent items are swapped.
I'm not sure if this problem is with zio-grpc or Pyhton's grpcio, below is some relevant information: OS: Ubuntu 22.04 Python version: 3.10.13 grpcio version: 1.51.1 (couldn't find 1.50.1) grpcio-tools version: 1.51.1 grpc version: 4.25.3 Scala version: 2.13.13 grpc-netty version: 1.50.1
I was able to reproduce - looks like it's fairly common for messages to go out of order when streaming. Testing this with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE=1000 makes the problem go away. Testing with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE=-1 results in the Python client receiving an error. This suggests backpressue logic needs to be fixed. @regiskuckaertz @cipriansofronia @ghostdogpr - will one of you have time to look into this?
The main difference with ZIO_GRPC_BACKPRESSURE_QUEUE_SIZE being -1 and 1000 is that when using 1000, we call stream.buffer which loses the chunking. With -1, we keep the chunking which is quite high here since ZStream.fromIterable uses DefaultChunkSize = 4096.
When we keep the chunking, we're doing chunk.foreach(call.call.sendMessage) in a single IO, while when we lose it, we're doing individual call.call.sendMessage per IO, which is much slower but I guess contributes to keeping the order. I am a little surprised that sendMessage doesn't guarantee the order if we call it very quickly 🤔
I've been playing with it for a while and it is hard to reproduce on my end. But what I noticed is that this is definitely not a bug in this library. To check this I changed the handler to:
ZChannel.readWithCause(
xs =>
ZChannel.fromZIO(GIO.attempt {
println("--")
println(xs.mkString(","))
xs.foreach(call.call.sendMessage)
}) *> ...
and then the user code stops when finding a reproducer:
ZioTestservice.GreeterClient
.sayHelloStreaming(HelloRequest(request = Some(Hello(name = "Testing streaming"))))
.runCollect
.repeatUntilZIO { ys =>
if (ys.map(_.i) != (0 until 100)) {
println(ys.map(_.i).mkString(", "))
ZIO.succeed(true)
} else ref.updateAndGet(_ - 1).map(_ == 0)
}
What I observed is that:
-
xsalways has size 1, which is surprising to me but fair enough - irremediably, whenever there's a reproducer, the successive
xsare also inverted, e.g.
0, 1, ..., 84, 83, ..., 99
and
--
HelloReply(84, UnknownFieldSet(Map()))
--
HelloReply(83, UnknownFieldSet(Map()))
I also note that the bug only happens (at least for me) when using ZStream.fromIterable(0 until 100), I was unable to observe it is with ZStream.iterate(0)(_ + 1).take(100) for instance—doesn't mean it can't happen for someone else though.
Anyway, this appears to be a bug in the stream runtime and it will be fun to find it 🤠
xsalways has size 1, which is surprising to me but fair enough
that's because map rechunks in ZStream.fromIterable(0 until 100).map(HelloReply(_))
xsalways has size 1, which is surprising to me but fair enoughthat's because
maprechunks inZStream.fromIterable(0 until 100).map(HelloReply(_))
Isn't it because of buffer that destroys chunking rather than map?
Small reproducer using zio only:
import zio.*
import zio.stream.*
object Test extends ZIOAppDefault {
val expected = Chunk.fromIterable(0 until 100)
val s = ZStream.fromChunk(expected).buffer(16)
def run = s.runCollect.map(_ == expected).debug.repeatWhile(identity)
}
It seems to work without buffer. Will open an issue on the zio repo.
https://github.com/zio/zio/issues/8699
xsalways has size 1, which is surprising to me but fair enoughthat's because
maprechunks inZStream.fromIterable(0 until 100).map(HelloReply(_))Isn't it because of
bufferthat destroys chunking rather thanmap?
I had added a rechunk and forgotten about it 😅
Speaking of buffer, I thought about using bufferChunks instead, that would avoid the rechunking, though one would need to change the size of the queue. Thoughts?
I actually added https://github.com/scalapb/zio-grpc/pull/578 to not buffer at all 😆 But yeah bufferChunks sounds better than buffer.
The ordering issue on zio side was fixed so I think this is resolved.