zio-http
zio-http copied to clipboard
invoke NettyBody callback in a separate fiber to avoid deadlocks
When reading a request from a super-fast producer, it is possible that zio.http.netty.AsyncBodyReader
accumulates more data in buffer than can fit into Stream buffer in zio.http.netty.NettyBody.AsyncBody#asStream
. In that case, the request will hang because we can't consume messages until callback pushes all the data, which it can't do until we start consuming messages.
Minimized example of an issue can be found here
https://scastie.scala-lang.org/sZ8DrRKxR2qAKxrAOV52hg
For a reproduction step on a full server, you can use steps from this ticket
https://github.com/zio/zio-http/issues/2297
Sometimes server is able to read data from yes hello | http --chunked POST localhost:8080/forever
, sometimes it's just silent.
I did try to write a test for, but couldn't find a way to reproduce it in tests. So I'd really appreciate if someone gave me a hint on how to do it. Also, callbacks sometimes blow my mind, so I hope nothing goes wrong if we move callback invocation to a separate fiber π€
Codecov Report
Patch coverage: 100.00%
and project coverage change: -0.09%
:warning:
Comparison is base (
ca6513f
) 63.41% compared to head (c99c6b8
) 63.33%.
:exclamation: Your organization is not using the GitHub App Integration. As a result you may experience degraded service beginning May 15th. Please install the Github App Integration for your organization. Read more.
Additional details and impacted files
@@ Coverage Diff @@
## main #2402 +/- ##
==========================================
- Coverage 63.41% 63.33% -0.09%
==========================================
Files 137 137
Lines 7113 7113
Branches 1259 1259
==========================================
- Hits 4511 4505 -6
- Misses 2602 2608 +6
Files Changed | Coverage Ξ | |
---|---|---|
...http/src/main/scala/zio/http/netty/NettyBody.scala | 70.83% <100.00%> (ΓΈ) |
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
@myazinn Does this fix #2297 ?
@lackhoa nope π
Rather than doing ZIO stuff inside a separate fiber, we should consider another mechanism that could be higher-performance. Will take a look at the code and suggest something.
@jdegoes another easy option would be to break abstraction a little bit and handle it in zio.http.netty.AsyncBodyReader#connect
. Instead of submitting chunks one by one
buffer.result().foreach { case (chunk, isLast) =>
callback(chunk, isLast)
}
We can merge these chunks in place and submit one big chunk
val (buffered, isLast) =
buffer
.result()
.foldLeft((Chunk.empty[Byte], false)) { case ((acc, _), (chunk, isLast)) =>
(acc ++ chunk, isLast)
}
callback(buffered, isLast)
I haven't done any benchmarks yet, but I guess it should be faster than both current version and one proposed in the MR. Alternatively, we can go even further and just build one large chunk in a builder. E.g. replace
private val buffer: ChunkBuilder[(Chunk[Byte], Boolean)] = ChunkBuilder.make[(Chunk[Byte], Boolean)]()
...
buffer.result().foreach { case (chunk, isLast) =>
callback(chunk, isLast)
}
with
private val buffer: ChunkBuilder[Byte] = ChunkBuilder.make[Byte]()
private var isLast: Boolean = false
...
callback(buffer.result(), isLast)
What do you think?
Looks like main
branch doesn't compile now π€
@jdegoes another easy option would be to break abstraction a little bit and handle it in
zio.http.netty.AsyncBodyReader#connect
. Instead of submitting chunks one by onebuffer.result().foreach { case (chunk, isLast) => callback(chunk, isLast) }
We can merge these chunks in place and submit one big chunk
val (buffered, isLast) = buffer .result() .foldLeft((Chunk.empty[Byte], false)) { case ((acc, _), (chunk, isLast)) => (acc ++ chunk, isLast) } callback(buffered, isLast)
I haven't done any benchmarks yet, but I guess it should be faster than both current version and one proposed in the MR. Alternatively, we can go even further and just build one large chunk in a builder. E.g. replace
private val buffer: ChunkBuilder[(Chunk[Byte], Boolean)] = ChunkBuilder.make[(Chunk[Byte], Boolean)]() ... buffer.result().foreach { case (chunk, isLast) => callback(chunk, isLast) }
with
private val buffer: ChunkBuilder[Byte] = ChunkBuilder.make[Byte]() private var isLast: Boolean = false ... callback(buffer.result(), isLast)
What do you think?
The purpose of the async body reader is to be able to process the body in a streaming way - when you don't want to buffer it in memory. So changing that would basically disable the streaming feature.
@vigoo not sure how it disables streaming feature. This buffer is used only until we connect some consumer to a channel. Once consumer is connected, this buffer will not be used and chunks received from netty will be passed directly to it. The only thing that changes is what happens when a consumer connects to a body reader, that has already buffered a lot of data. Currently it'll feed chunk of chunks to a consumer, while I propose to flatten them and pass as one large chunk
Ah ok sorry, misunderstood.
@myazinn Let's do this in the most performant way possible, please! π
@myazinn Master is fixed, btw. π
@myazinn could you update your branch?
@kyri-petrou Do you have any thoughts on how to do this better?
@kyri-petrou Do you have any thoughts on how to do this better?
@jdegoes I've given this some thought; I think the main issue here is that we're using a bounded queue in ZStream.async
. To be honest I don't think using a bounded queue makes much sense because in the case where we have an ultra-fast producer and a slow consumer, the messages will still end up piling in Netty's queues, so backpressure doesn't really help with anything (unless we were to limit netty's queue sizes, which we're not doing).
AFAICT there are 2 possible solutions:
- Limit Netty's inbound queue size to the same size as the ZStream buffer size. Frankly, I'm not even sure how would this behave in cases the queue becomes full.
- Use an unbounded queue as the source of the ZStream. I think this makes the most sense in this situation. In case we do want to add backpressure at some point, it can be done on Netty's side
Let me know if you're happy with (2) and I'll start working on it; should be relatively easy to implement