tonic icon indicating copy to clipboard operation
tonic copied to clipboard

Server-streaming is always batched based on size of mpsc::channel(N) + 1

Open matt-deboer opened this issue 2 years ago • 6 comments

Bug Report

Version

├── tonic v0.9.2 ├── tonic-web v0.9.2 │ ├── tonic v0.9.2 (*) └── tonic-build v0.9.2

Platform

Linux 5.18.10-051810-generic #202207071639 SMP PREEMPT_DYNAMIC Thu Jul 7 17:34:42 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

Description

I can't say that this is technically a bug--tonic is working fine in every other way I can detect, and I doubt gRPC specs require any specific buffering or non-buffering behavior; it's just unexpected/unfortunate behavior...

Basically, I'm seeing the behavior previously mentioned in #378 (which was marked closed about a year ago); i.e., for a server-streaming rpc, the server buffers (N + 1) frames before sending a batch to the client, where N is the value used in mpsc::channel(N).

What I want to happen is for messages to be delivered as soon as possible to the client. I can work around this by doing extra buffering on the client, but it essentially means I have to accept a minimum of 1 extra frame of latency (if I set my mpsc channel size to 1) which I'd like to avoid.

I couldn't work out any specific details of what the fix was in that previous issue, other than "upgraded dependencies". I'm running on newer versions of everything mentioned there, so either this was a regression, or maybe some sort of transient issue for that particular case.

In my case, I'm sending a stream of jpeg images, ~ 48k each, and seeing the pattern on the client side is to receive N+1 frames in a bundle, for whatever value of N I plug in when creating the channel ( mpsc::channel(N))...any ideas how I can get immediate send on a stream? As it stands, I'm creating a channel of size (1), which is as small as tokio::mpsc will allow.

I tried using an UnboundedReceiverStream instead of a ReceiverStream, but this just blocks forever. This part definitely seems to be a bug, as it prevents the client from seeing any stream of messages.

This seems to indicate that the send call on the channel is not detected by tonic, but somehow a channel buffer is full event is...

Here's a sample of the logs with RUST_LOG=h2=debug:

2023-04-26T04:01:44.179204Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.179479Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.179587Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15636 }
2023-04-26T04:01:44.179673Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.179890Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.180926Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 746 }
2023-04-26T04:01:44.181706Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.181890Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14885 }
2023-04-26T04:01:44.367745Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.367862Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.367899Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.368007Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.368067Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.368973Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.369058Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.369165Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.369212Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15749 }
2023-04-26T04:01:44.369254Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.369457Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.370040Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 633 }
2023-04-26T04:01:44.370466Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.370632Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15192 }
2023-04-26T04:01:44.501486Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.501607Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.501642Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.501773Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.501825Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.502406Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.502490Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.502593Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.502654Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15531 }
2023-04-26T04:01:44.502708Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.502858Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.503328Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 851 }
2023-04-26T04:01:44.503764Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.503868Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14556 }
2023-04-26T04:01:44.641676Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.641862Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.641931Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.642152Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.642267Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.643597Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.643753Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.644122Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.644247Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15569 }
2023-04-26T04:01:44.644345Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.644495Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.645446Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 813 }
2023-04-26T04:01:44.646289Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.646617Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14526 }
2023-04-26T04:01:44.836142Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.836308Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.836356Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.836537Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.836627Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.837441Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.837528Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.837685Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.837763Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15638 }
2023-04-26T04:01:44.837828Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.838061Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.838897Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 744 }
2023-04-26T04:01:44.839567Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.839735Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14911 }
2023-04-26T04:01:44.965813Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.965884Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.965902Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.965965Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.965993Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.966287Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.966355Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.966402Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.966427Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15641 }
2023-04-26T04:01:44.966438Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:44.966566Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.966862Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 741 }
2023-04-26T04:01:44.967005Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:44.967066Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14911 }
2023-04-26T04:01:45.166676Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.166794Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.166843Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.166976Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.167043Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.168037Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.168198Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.168359Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.168441Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15829 }
2023-04-26T04:01:45.168502Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.168927Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.169460Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 553 }
2023-04-26T04:01:45.169876Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.170060Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15169 }
2023-04-26T04:01:45.308010Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.308313Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.308404Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.308725Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.308842Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.310438Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.311074Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.311338Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.311550Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.311738Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15661 }
2023-04-26T04:01:45.311924Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.312954Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 721 }
2023-04-26T04:01:45.314088Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.314307Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14910 }
2023-04-26T04:01:45.435564Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.435689Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.435740Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.435874Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.435931Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.436857Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.436926Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.437009Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.437047Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15650 }
2023-04-26T04:01:45.437063Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.437467Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.438040Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 732 }
2023-04-26T04:01:45.438480Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.438653Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15077 }
2023-04-26T04:01:45.639763Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.639976Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.640055Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.640211Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.640277Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.641314Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.641426Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.641588Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.641692Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.641862Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15827 }
2023-04-26T04:01:45.642344Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.643210Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 555 }
2023-04-26T04:01:45.644075Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.644312Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15145 }
2023-04-26T04:01:45.777461Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.777624Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.777685Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.777868Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.777980Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.779864Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.780029Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.780252Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.780341Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15549 }
2023-04-26T04:01:45.780381Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.780932Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.782037Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 833 }
2023-04-26T04:01:45.783077Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.783426Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14686 }
2023-04-26T04:01:45.971914Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.972095Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.972154Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.972346Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.972442Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.973938Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.974047Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.974196Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.974268Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:45.974389Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15778 }
2023-04-26T04:01:45.974992Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.975924Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 604 }
2023-04-26T04:01:45.976636Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:45.976894Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14610 }
2023-04-26T04:01:46.099579Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.099708Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.099742Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.100041Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.100068Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.100546Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:46.100602Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.100668Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:46.100693Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 15000 }
2023-04-26T04:01:46.100705Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.100858Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:46.101279Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 1382 }
2023-04-26T04:01:46.101561Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:46.101620Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 13733 }
2023-04-26T04:01:46.228774Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.228861Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.228878Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.228958Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.228982Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.229353Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:46.229386Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.229421Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:46.229436Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 14958 }
2023-04-26T04:01:46.229446Z DEBUG Connection{peer=Server}: h2::codec::framed_write: send frame=Data { stream_id: StreamId(9) }
2023-04-26T04:01:46.229578Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:46.229779Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 1424 }
2023-04-26T04:01:46.230020Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 16384 }
2023-04-26T04:01:46.230071Z DEBUG Connection{peer=Server}: h2::codec::framed_read: received frame=WindowUpdate { stream_id: StreamId(9), size_increment: 12081 }

matt-deboer avatar Apr 26 '23 04:04 matt-deboer

I swapped out the tokio::sync::mpsc::channel (which had a minimum of 1) for futures::channel::mpsc::channel (which has minimum of 0), and am able to get immediate streaming to the client at the expected frame rate if I create the channel with 0 extra buffer.

The batching behavior still remains for any value greater than 0, even with the futures::channel::mpsc::channel , and the std::futures::mpsc::unbounded also hangs indefinitely.

matt-deboer avatar May 01 '23 05:05 matt-deboer

Just commenting to say I've observed the same behaviour with tokio::sync::mpsc::channel and also async_stream

xd009642 avatar Jun 02 '23 17:06 xd009642

I tried using an UnboundedReceiverStream instead of a ReceiverStream, but this just blocks forever. This part definitely seems to be a bug, as it prevents the client from seeing any stream of messages.

This is for sure a bug, do you have some sort of reproduction I could play with?

LucioFranco avatar Jun 02 '23 17:06 LucioFranco

I've created a reproduction which uses tokio's mpsc, futures mpsc and async_stream. https://github.com/xd009642/tonic-repro-1375

It's simple in the code to tune the number of packets in the stream and size of each packet as well as channel size. But I've just left them as some initial constants that recreated the issue.

EDIT: also added async-channels bounded mpmc and it exhibits the same issue.

xd009642 avatar Jun 02 '23 19:06 xd009642

Just updated my reproduction with the unbounded receiver stream and I didn't see the reported behaviour, instead after all 20 messages were sent it started receiving messages. Though maybe the sender being dropped after all the messages were sent caused something to wake up and if that wasn't the case it would have been waiting forever :thinking:

xd009642 avatar Jun 05 '23 11:06 xd009642

I've observed the same issue, any update on this?

grao1991 avatar Mar 20 '24 00:03 grao1991