quinn icon indicating copy to clipboard operation
quinn copied to clipboard

Test in CI with randomized packet loss/duplication/reordering/delay

Open Ralith opened this issue 4 years ago • 4 comments

e.g. #650 has compellingly illustrated the value here. Ideally we use a fixed seed and run some large number of iterations in CI for reproducibility, but a brute force netem-based solution as in the example is manifestly far better than nothing.

Ralith avatar Mar 11 '20 02:03 Ralith

A good starting point might be the below in conjunction with tc qdisc add dev lo root netem delay 5ms 10ms 25% distribution normal loss 5% duplicate 5% reorder 40% 50%. It simply loops creating, sending/receiving random data, and closing a connection. I believe this would surface all the issues I've observed so far.

use futures::{future::select_all, lock::Mutex, stream::FuturesUnordered, FutureExt, StreamExt};
use quinn::{
    Certificate, ClientConfig, ClientConfigBuilder, ConnectionError, NewConnection, RecvStream, ServerConfigBuilder, WriteError
};
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::{
    future::Future, net::{IpAddr, Ipv6Addr, SocketAddr, UdpSocket}, pin::Pin, sync::Arc
};

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt::init();

    let mut rng = SmallRng::seed_from_u64(0);

    let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
    let key = quinn::PrivateKey::from_der(&cert.serialize_private_key_der()).unwrap();
    let cert = Certificate::from_der(&cert.serialize_der().unwrap()).unwrap();
    let cert_chain = quinn::CertificateChain::from_certs(vec![cert.clone()]);

    let mut transport = quinn::TransportConfig::default();
    transport.max_idle_timeout(None).unwrap();
    let transport = Arc::new(transport);

    let mut server_config = quinn::ServerConfig::default();
    server_config.transport = transport.clone();
    let mut server_config = ServerConfigBuilder::new(server_config);
    server_config.certificate(cert_chain, key).unwrap();
    let server_config = server_config.build();

    let mut server_endpoint = quinn::Endpoint::builder();
    server_endpoint.listen(server_config);
    let (server_endpoint, incoming) =
        server_endpoint.bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0)).unwrap();
    let server_addr = server_endpoint.local_addr().unwrap();
    let incoming = Mutex::new(incoming);

    let (client_endpoint, _) = quinn::Endpoint::builder()
        .bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0))
        .unwrap();

    let mut client_config = ClientConfig::default();
    client_config.transport = transport.clone();
    let mut client_config = ClientConfigBuilder::new(client_config);
    client_config.add_certificate_authority(cert.clone()).unwrap();
    let client_config = client_config.build();

    let iterations = 100_000;
    let mut shared_rng = rng.clone();
    let shared_rng1 = shared_rng.clone();
    let mut sender = None;
    let mut receiver = None;
    const BUF: usize = 10 * 1024;
    let mut send_buf = vec![0; BUF];
    rng.fill(&mut *send_buf);
    let mut recv_buf = vec![0; BUF];
    let sending = async {
        let mut shared_rng = shared_rng1;
        for _i in 0..iterations {
            if sender.is_none() {
                let connection = client_endpoint
                    .connect_with(client_config.clone(), &server_addr, "localhost")
                    .unwrap();
                let connection = connection.await.unwrap();
                let mut sender_ = connection.connection.open_uni().await.unwrap();
                sender_.write_all(&[1]).await.unwrap();
                sender = Some(sender_);
            } else if shared_rng.gen() {
                match sender.take().unwrap().finish().await {
                    Ok(())
                    | Err(WriteError::ConnectionClosed(ConnectionError::ApplicationClosed(_)))
                    | Err(WriteError::ConnectionClosed(ConnectionError::Reset)) => (),
                    Err(err) => panic!("{:?}", err),
                }
            } else {
                let range = loop {
                    let start = shared_rng.gen_range(0, BUF);
                    let end = shared_rng.gen_range(0, BUF);
                    if start < end {
                        break start..end;
                    }
                };
                sender.as_mut().unwrap().write_all(&send_buf[range]).await.unwrap();
            }
        }
        if let Some(mut sender) = sender {
            match sender.finish().await {
                Ok(())
                | Err(WriteError::ConnectionClosed(ConnectionError::ApplicationClosed(_)))
                | Err(WriteError::ConnectionClosed(ConnectionError::Reset)) => (),
                Err(err) => panic!("{:?}", err),
            }
        }
    };
    let receiving = async {
        for i in 0..iterations {
            if i % 100 == 0 {
                println!("{}", i);
            }
            if receiver.is_none() {
                // let pool = Arc::new(FuturesUnordered::new());
                // fn accept<'a>(
                //  pool: Arc<FuturesUnordered<Pin<Box<dyn Future<Output = RecvStream> + 'a>>>>,
                //  incoming: &'a Mutex<quinn::Incoming>, i: usize,
                // ) -> impl Future<Output = RecvStream> + 'a {
                //  async move {
                //      let connecting = incoming.lock().await.next().await.expect("accept");
                //      pool.push(Box::pin(accept(pool.clone(), incoming, i)));
                //      let NewConnection { mut uni_streams, .. } =
                //          connecting.await.expect("connect");
                //      let mut receiver = uni_streams.next().await.unwrap().unwrap();
                //      let mut buf = [0; 1];
                //      receiver.read_exact(&mut buf).await.unwrap();
                //      assert_eq!(buf, [1]);
                //      receiver
                //  }
                // }
                // pool.push(Box::pin(accept(pool.clone(), &incoming, i)));
                // receiver = Some(pool.next().await.unwrap());
                // TODO: get above working to avoid hardcoding 10 accepters
                receiver = Some(
                    select_all((0..10).map(|_| {
                        async {
                            let connecting = incoming.lock().await.next().await.expect("accept");
                            let NewConnection { mut uni_streams, .. } =
                                connecting.await.expect("connect");
                            let mut receiver = uni_streams.next().await.unwrap().unwrap();
                            let mut buf = [0; 1];
                            receiver.read_exact(&mut buf).await.unwrap();
                            assert_eq!(buf, [1]);
                            receiver
                        }
                        .boxed_local()
                    }))
                    .await
                    .0,
                );
            } else if shared_rng.gen() {
                let mut receiver = receiver.take().unwrap();
                if rng.gen() {
                    let fin = receiver.read(&mut [0]).await.unwrap();
                    assert!(fin.is_none());
                }
            } else {
                let range = loop {
                    let start = shared_rng.gen_range(0, BUF);
                    let end = shared_rng.gen_range(0, BUF);
                    if start < end {
                        break start..end;
                    }
                };
                receiver.as_mut().unwrap().read_exact(&mut recv_buf[..range.len()]).await.unwrap();
                assert_eq!(send_buf[range.clone()], recv_buf[..range.len()]);
            }
        }
        if let Some(mut receiver) = receiver {
            let fin = receiver.read(&mut [0]).await.unwrap();
            assert!(fin.is_none());
        }
    };
    tokio::join!(sending, receiving);
    tokio::join!(client_endpoint.wait_idle(), server_endpoint.wait_idle());
}

alecmocatta avatar Mar 12 '20 14:03 alecmocatta

Multiple bugs have now been found that arose specifically due to poorly exercised paths through the handling of spuriously retransmitted data, so randomized delays are likely to be particularly valuable.

Ralith avatar Mar 22 '20 23:03 Ralith

Just an idea, but one could write a UDP proxy that implements loss/duplication/reordering/delay. The quinn client connects to the quinn server via the proxy and the datagrams are routed according to an internal routing table. Would be easy to do reproducible testing this way. You could even test random disconnections etc. I'm using a similar strategy to test a library that sits on top of quinn. This adds some overhead however.

jean-airoldie avatar Aug 31 '20 08:08 jean-airoldie

The tc qdisc invocations illustrated above accomplish that.

Ralith avatar Sep 02 '20 19:09 Ralith