zio-http icon indicating copy to clipboard operation
zio-http copied to clipboard

invoke NettyBody callback in a separate fiber to avoid deadlocks

Open myazinn opened this issue 1 year ago β€’ 13 comments

When reading a request from a super-fast producer, it is possible that zio.http.netty.AsyncBodyReader accumulates more data in buffer than can fit into Stream buffer in zio.http.netty.NettyBody.AsyncBody#asStream. In that case, the request will hang because we can't consume messages until callback pushes all the data, which it can't do until we start consuming messages. Minimized example of an issue can be found here https://scastie.scala-lang.org/sZ8DrRKxR2qAKxrAOV52hg For a reproduction step on a full server, you can use steps from this ticket https://github.com/zio/zio-http/issues/2297 Sometimes server is able to read data from yes hello | http --chunked POST localhost:8080/forever, sometimes it's just silent. I did try to write a test for, but couldn't find a way to reproduce it in tests. So I'd really appreciate if someone gave me a hint on how to do it. Also, callbacks sometimes blow my mind, so I hope nothing goes wrong if we move callback invocation to a separate fiber πŸ€”

myazinn avatar Aug 25 '23 20:08 myazinn

CLA assistant check
All committers have signed the CLA.

CLAassistant avatar Aug 25 '23 20:08 CLAassistant

Codecov Report

Patch coverage: 100.00% and project coverage change: -0.09% :warning:

Comparison is base (ca6513f) 63.41% compared to head (c99c6b8) 63.33%.

:exclamation: Your organization is not using the GitHub App Integration. As a result you may experience degraded service beginning May 15th. Please install the Github App Integration for your organization. Read more.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2402      +/-   ##
==========================================
- Coverage   63.41%   63.33%   -0.09%     
==========================================
  Files         137      137              
  Lines        7113     7113              
  Branches     1259     1259              
==========================================
- Hits         4511     4505       -6     
- Misses       2602     2608       +6     
Files Changed Coverage Ξ”
...http/src/main/scala/zio/http/netty/NettyBody.scala 70.83% <100.00%> (ΓΈ)

... and 1 file with indirect coverage changes

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov-commenter avatar Aug 25 '23 20:08 codecov-commenter

@myazinn Does this fix #2297 ?

lackhoa avatar Sep 23 '23 14:09 lackhoa

@lackhoa nope πŸ™

myazinn avatar Sep 24 '23 09:09 myazinn

Rather than doing ZIO stuff inside a separate fiber, we should consider another mechanism that could be higher-performance. Will take a look at the code and suggest something.

jdegoes avatar Sep 24 '23 19:09 jdegoes

@jdegoes another easy option would be to break abstraction a little bit and handle it in zio.http.netty.AsyncBodyReader#connect. Instead of submitting chunks one by one

          buffer.result().foreach { case (chunk, isLast) =>
            callback(chunk, isLast)
          }

We can merge these chunks in place and submit one big chunk

          val (buffered, isLast) =
            buffer
              .result()
              .foldLeft((Chunk.empty[Byte], false)) { case ((acc, _), (chunk, isLast)) =>
                (acc ++ chunk, isLast)
              }
          callback(buffered, isLast)

I haven't done any benchmarks yet, but I guess it should be faster than both current version and one proposed in the MR. Alternatively, we can go even further and just build one large chunk in a builder. E.g. replace

private val buffer: ChunkBuilder[(Chunk[Byte], Boolean)] = ChunkBuilder.make[(Chunk[Byte], Boolean)]()

...

          buffer.result().foreach { case (chunk, isLast) =>
            callback(chunk, isLast)
          }

with

  private val buffer: ChunkBuilder[Byte] = ChunkBuilder.make[Byte]()
  private var isLast: Boolean            = false
  
  ...
           callback(buffer.result(), isLast)

What do you think?

myazinn avatar Sep 24 '23 22:09 myazinn

Looks like main branch doesn't compile now πŸ€”

myazinn avatar Sep 24 '23 22:09 myazinn

@jdegoes another easy option would be to break abstraction a little bit and handle it in zio.http.netty.AsyncBodyReader#connect. Instead of submitting chunks one by one

          buffer.result().foreach { case (chunk, isLast) =>
            callback(chunk, isLast)
          }

We can merge these chunks in place and submit one big chunk

          val (buffered, isLast) =
            buffer
              .result()
              .foldLeft((Chunk.empty[Byte], false)) { case ((acc, _), (chunk, isLast)) =>
                (acc ++ chunk, isLast)
              }
          callback(buffered, isLast)

I haven't done any benchmarks yet, but I guess it should be faster than both current version and one proposed in the MR. Alternatively, we can go even further and just build one large chunk in a builder. E.g. replace

private val buffer: ChunkBuilder[(Chunk[Byte], Boolean)] = ChunkBuilder.make[(Chunk[Byte], Boolean)]()

...

          buffer.result().foreach { case (chunk, isLast) =>
            callback(chunk, isLast)
          }

with

  private val buffer: ChunkBuilder[Byte] = ChunkBuilder.make[Byte]()
  private var isLast: Boolean            = false
  
  ...
           callback(buffer.result(), isLast)

What do you think?

The purpose of the async body reader is to be able to process the body in a streaming way - when you don't want to buffer it in memory. So changing that would basically disable the streaming feature.

vigoo avatar Sep 25 '23 05:09 vigoo

@vigoo not sure how it disables streaming feature. This buffer is used only until we connect some consumer to a channel. Once consumer is connected, this buffer will not be used and chunks received from netty will be passed directly to it. The only thing that changes is what happens when a consumer connects to a body reader, that has already buffered a lot of data. Currently it'll feed chunk of chunks to a consumer, while I propose to flatten them and pass as one large chunk

myazinn avatar Sep 25 '23 07:09 myazinn

Ah ok sorry, misunderstood.

vigoo avatar Sep 25 '23 10:09 vigoo

@myazinn Let's do this in the most performant way possible, please! πŸ™

jdegoes avatar Sep 28 '23 09:09 jdegoes

@myazinn Master is fixed, btw. πŸ‘

jdegoes avatar Sep 28 '23 09:09 jdegoes

@myazinn could you update your branch?

987Nabil avatar Mar 06 '24 16:03 987Nabil

@kyri-petrou Do you have any thoughts on how to do this better?

jdegoes avatar Aug 27 '24 22:08 jdegoes

@kyri-petrou Do you have any thoughts on how to do this better?

@jdegoes I've given this some thought; I think the main issue here is that we're using a bounded queue in ZStream.async. To be honest I don't think using a bounded queue makes much sense because in the case where we have an ultra-fast producer and a slow consumer, the messages will still end up piling in Netty's queues, so backpressure doesn't really help with anything (unless we were to limit netty's queue sizes, which we're not doing).

AFAICT there are 2 possible solutions:

  1. Limit Netty's inbound queue size to the same size as the ZStream buffer size. Frankly, I'm not even sure how would this behave in cases the queue becomes full.
  2. Use an unbounded queue as the source of the ZStream. I think this makes the most sense in this situation. In case we do want to add backpressure at some point, it can be done on Netty's side

Let me know if you're happy with (2) and I'll start working on it; should be relatively easy to implement

kyri-petrou avatar Aug 28 '24 11:08 kyri-petrou