futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

Timing out on `SinkExt::send` still causes the item to be re-inserted later into the bounded channel

Open NumberFour8 opened this issue 1 year ago • 0 comments

When inserting an element into the bounded MPSC channel that is at full capacity, the SinkExt::send future correctly waits until there is space in the channel.

However, if the SinkExt::send future is selected against a timeout (the future should be therefore destroyed if the timeout elapses first), the attempted value is still later inserted into the channel.

The expectation is that if the send future is terminated early, the item it attempted to insert should never appear in the channel.

Instead, it seems that the last item attempted to be send is cached (when polling for channel readiness starts inside send) and re-delivered in the next send. That seems as a bug, or at best an undocumented behavior, if I'm not mistaken.

Minimal example for reproducing the behavior:

use std::time::Duration;

use futures::prelude::*;
use futures::pin_mut;
use tokio;
use anyhow;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (snd, recv) = futures::channel::mpsc::channel(3);
    pin_mut!(snd);
    pin_mut!(recv);
    
    snd.send(1).await?;
    snd.send(2).await?;
    snd.send(3).await?;
    // Channel now contains 1,2,3
    
    // Insertion of 4 times out, because the channel is at full capacity
    {
        let insert_4 = std::pin::pin!(snd.send(4));
        let timeout = std::pin::pin!(tokio::time::sleep(Duration::from_millis(500)));
        match futures::future::select(insert_4, timeout).await {
            futures::future::Either::Left(_) => anyhow::bail!("must timeout when at full capacity"),
            futures::future::Either::Right(_) => {}
        }
    }
    
    // Free up some capacity by consuming 1,2
    assert_eq!(Some(1), recv.next().await);
    assert_eq!(Some(2), recv.next().await);
    // Channel should now contain only 3
    
    // Now send 5 into the channel and close it
    snd.send(5).await?;
    snd.close_channel();
    
    // Channel now should contain 3,5 (but actually seems to contain 3,4,5)
    assert_eq!(Some(3), recv.next().await);
    assert_eq!(Some(5), recv.next().await); // Panics here: 4 happens to be in the despite the timeout
    assert_eq!(None, recv.next().await);
    
    
    Ok(())
}

Playground link: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e72c947af414fc813345d71533065414

NumberFour8 avatar Oct 27 '24 17:10 NumberFour8