rust-libp2p icon indicating copy to clipboard operation
rust-libp2p copied to clipboard

Websocket yamux regression in v0.42

Open appaquet opened this issue 3 years ago • 8 comments

I tried upgrading my project to 0.42 in which I use WebSocket transport on web nodes. Inbetween TCP nodes everything is fine, but it seems like there is a regression when using WebSocket transport in my wasm compiled web client.

I've bisected the issue to the bump to yamux 0.10, and more precisely this commit in yamux.

It seems like communication is either not working, sporadic or very slow. One example is the ping protocol that takes 10s to ping the server from the web client. One interesting fact is that it seems like more messages get through if I generate more traffic (ex: generate more messages to the server). My hunch is that the WebSocket transport has different behaviour in terms of connection states that is not handled correctly by yamux, and generating traffic gets it out of some kind of pending state.

See this gist for libp2p & yamux logs on the server and on the web client.

Unfortunately, my project may be a bit complex to get running to show a minimal working example. Do you have any WebSocket / wasm example code that I could use to create a minimal reproducible code?

Thanks !

appaquet avatar Jan 30 '22 15:01 appaquet

Thanks for reporting. Definitely worth investigating.

Unfortunately, my project may be a bit complex to get running to show a minimal working example. Do you have any WebSocket / wasm example code that I could use to create a minimal reproducible code?

I don't have one handy. Maybe @wngr?

mxinden avatar Feb 01 '22 18:02 mxinden

It seems like I was wrong on the origin of the problem in yamux's commit history. I've narrowed the problem to https://github.com/libp2p/rust-yamux/pull/112

It seems like the issue comes from not fully awaiting the flush of the underlying socket in the connection.

The problems get solved by correctly awaiting the flush like this:

diff --git a/src/connection.rs b/src/connection.rs
index 60bed2c..4287dca 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -883,8 +883,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
     /// Try to flush the underlying I/O stream, without waiting for it.
     async fn flush_nowait(&mut self) -> Result<()> {
         future::poll_fn(|cx| {
-            let _ = self.socket.get_mut().poll_flush(cx)?;
-            Poll::Ready(Ok(()))
+            self.socket.get_mut().poll_flush(cx).map_err(|err| err.into())
         }).await
     }

Perhaps I'm wrong, but it seems like flushing in frame::Io polls to send the remaining frame. If sending cannot be completed (returns pending), then it will never get fully flushed since flush_nowait doesn't poll back.

Perhaps the issue is not as apparent in other transports, but I would think WebSocket transport is more prone to poll pending because of the way it's implemented (see this)

Let me know... I could open a PR to change the polling back to waiting, but I would guess there is a reason it's been switched to non-waiting (perf?)

appaquet avatar Feb 09 '22 02:02 appaquet

Thanks for further bisecting this issue!

Perhaps I'm wrong, but it seems like flushing in frame::Io polls to send the remaining frame. If sending cannot be completed (returns pending), then it will never get fully flushed since flush_nowait doesn't poll back.

From a quick look I would expect, given that flush_nowait is called in a loop, that it would still succeed.

I could open a PR to change the polling back to waiting

But that would block until everything is flushed. While flushing one could not receive new packets. If both ends wait to flush we would have a dead-lock, no?

mxinden avatar Feb 09 '22 14:02 mxinden

Thanks for further bisecting this issue!

Perhaps I'm wrong, but it seems like flushing in frame::Io polls to send the remaining frame. If sending cannot be completed (returns pending), then it will never get fully flushed since flush_nowait doesn't poll back.

From a quick look I would expect, given that flush_nowait is called in a loop, that it would still succeed.

You're right! I had not noticed the loop there.

I could open a PR to change the polling back to waiting

But that would block until everything is flushed. While flushing one could not receive new packets. If both ends wait to flush we would have a dead-lock, no?

I'll try to dig down a bit more tonight. Maybe the changes in yamux aren't the issue after all, but just triggering an issue in the way the websocket transport is implemented...

appaquet avatar Feb 09 '22 17:02 appaquet

Ok, I think I got it. The issue seems to really be in yamux, and is indeed related to improper flushing.

In the next() method's loop, a frame could be fed to the socket and its write completed in the first half of the loop. Once it had been written, it falls in the second section waiting for a next incoming frame or next stream/control commands, but the previous write may still have been left unflushed. This seems to be problematic in websocket for some reason...

This fix makes sure that we still make progress on flushing anything left in the socket while waiting for the next frame:

diff --git a/src/connection.rs b/src/connection.rs
index e87a76f..584a4d5 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -432,7 +432,21 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
                 num_terminated += 1;
                 Either::Left(future::pending())
             } else {
-                Either::Right(self.socket.next())
+                // poll socket for next incoming frame, but also make sure any pending writes are properly flushed
+                let socket = &mut self.socket;
+                let next = future::poll_fn(move |cx| {
+                    if let Poll::Ready(res) = socket.poll_next_unpin(cx) {
+                        return Poll::Ready(res);
+                    }
+
+                    if let Poll::Ready(Err(err)) = socket.poll_flush_unpin(cx) {
+                        return Poll::Ready(Some(Err(err.into())));
+                    }
+
+                    Poll::Pending
+                });
+
+                Either::Right(next)
             };
 
             let mut next_stream_command = if self.stream_receiver.is_terminated() {

Another solution is to flush the socket in the first half of the loop like this:

diff --git a/src/connection.rs b/src/connection.rs
index e87a76f..b9eb3e0 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -391,7 +391,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
             } else {
                 let socket = &mut self.socket;
                 let io = future::poll_fn(move |cx| {
-                    if let Poll::Ready(res) = socket.poll_ready_unpin(cx) {
+                    if let Poll::Ready(res) = socket.poll_flush_unpin(cx) {
                         res.or(Err(ConnectionError::Closed))?;
                         return Poll::Ready(Result::Ok(IoEvent::OutboundReady));
                     }

It works because the implementation of flush in Io ensures the progress of the writing of the pending outgoing frame. Albeit simpler, I don't like this solution as much since it goes against the semantic of flush... Also, flushing in the first half prevents the connection from receiving any new stream/control commands until the flush completes.

What do you think @mxinden ?

appaquet avatar Feb 10 '22 02:02 appaquet

A bit more digging... Writing to websocket (wasm-ext) may lead to Pending if another frame is being sent (see this). Noise implementation can write pending frame on flush (see this). Which means that flushing may lead to Pending from wasm-ext transport, which means that frames in noise may never get written until next write/flush (which may take a while since yamux waits for next incoming frame or stream/control commands)

appaquet avatar Feb 13 '22 17:02 appaquet

In the next() method's loop, a frame could be fed to the socket and its write completed in the first half of the loop. Once it had been written, it falls in the second section waiting for a next incoming frame or next stream/control commands, but the previous write may still have been left unflushed. This seems to be problematic in websocket for some reason...

This fix makes sure that we still make progress on flushing anything left in the socket while waiting for the next frame:

diff --git a/src/connection.rs b/src/connection.rs
index e87a76f..584a4d5 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -432,7 +432,21 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
                 num_terminated += 1;
                 Either::Left(future::pending())
             } else {
-                Either::Right(self.socket.next())
+                // poll socket for next incoming frame, but also make sure any pending writes are properly flushed
+                let socket = &mut self.socket;
+                let next = future::poll_fn(move |cx| {
+                    if let Poll::Ready(res) = socket.poll_next_unpin(cx) {
+                        return Poll::Ready(res);
+                    }
+
+                    if let Poll::Ready(Err(err)) = socket.poll_flush_unpin(cx) {
+                        return Poll::Ready(Some(Err(err.into())));
+                    }
+
+                    Poll::Pending
+                });
+
+                Either::Right(next)
             };
 
             let mut next_stream_command = if self.stream_receiver.is_terminated() {

This makes sense to me. Good catch.

I am not yet sure whether we should extend next_frame, or should introduce a forth future next_flush. Would you mind proposing the above as a patch on rust-yamux? I think that is the best place to discuss the patch.

Another solution is to flush the socket in the first half of the loop like this:

diff --git a/src/connection.rs b/src/connection.rs
index e87a76f..b9eb3e0 100644
--- a/src/connection.rs
+++ b/src/connection.rs
@@ -391,7 +391,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
             } else {
                 let socket = &mut self.socket;
                 let io = future::poll_fn(move |cx| {
-                    if let Poll::Ready(res) = socket.poll_ready_unpin(cx) {
+                    if let Poll::Ready(res) = socket.poll_flush_unpin(cx) {
                         res.or(Err(ConnectionError::Closed))?;
                         return Poll::Ready(Result::Ok(IoEvent::OutboundReady));
                     }

It works because the implementation of flush in Io ensures the progress of the writing of the pending outgoing frame. Albeit simpler, I don't like this solution as much since it goes against the semantic of flush... Also, flushing in the first half prevents the connection from receiving any new stream/control commands until the flush completes.

Agreed that this would not be in line with the semantics of poll_flush and poll_ready.

mxinden avatar Feb 15 '22 14:02 mxinden

I am not yet sure whether we should extend next_frame, or should introduce a forth future next_flush.

That's what I thought I would do at first, but polling the next frame and polling flush both require a mutable ref on the underlying socket.

Would you mind proposing the above as a patch on rust-yamux? I think that is the best place to discuss the patch.

Yeah, I'll do it when I'll have a bit of time

appaquet avatar Feb 15 '22 15:02 appaquet

Let me know if this is still and issue.

thomaseizinger avatar Mar 29 '23 11:03 thomaseizinger

Not an issue anymore. It had just not been closed. The problem was fixed in https://github.com/libp2p/rust-yamux/pull/130

appaquet avatar Mar 29 '23 11:03 appaquet