futures-rs
futures-rs copied to clipboard
Timing out on `SinkExt::send` still causes the item to be re-inserted later into the bounded channel
When inserting an element into the bounded MPSC channel that is at full capacity, the SinkExt::send future correctly waits until there is space in the channel.
However, if the SinkExt::send future is selected against a timeout (the future should be therefore destroyed if the timeout elapses first), the attempted value is still later inserted into the channel.
The expectation is that if the send future is terminated early, the item it attempted to insert should never appear in the channel.
Instead, it seems that the last item attempted to be send is cached (when polling for channel readiness starts inside send) and re-delivered in the next send. That seems as a bug, or at best an undocumented behavior, if I'm not mistaken.
Minimal example for reproducing the behavior:
use std::time::Duration;
use futures::prelude::*;
use futures::pin_mut;
use tokio;
use anyhow;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let (snd, recv) = futures::channel::mpsc::channel(3);
pin_mut!(snd);
pin_mut!(recv);
snd.send(1).await?;
snd.send(2).await?;
snd.send(3).await?;
// Channel now contains 1,2,3
// Insertion of 4 times out, because the channel is at full capacity
{
let insert_4 = std::pin::pin!(snd.send(4));
let timeout = std::pin::pin!(tokio::time::sleep(Duration::from_millis(500)));
match futures::future::select(insert_4, timeout).await {
futures::future::Either::Left(_) => anyhow::bail!("must timeout when at full capacity"),
futures::future::Either::Right(_) => {}
}
}
// Free up some capacity by consuming 1,2
assert_eq!(Some(1), recv.next().await);
assert_eq!(Some(2), recv.next().await);
// Channel should now contain only 3
// Now send 5 into the channel and close it
snd.send(5).await?;
snd.close_channel();
// Channel now should contain 3,5 (but actually seems to contain 3,4,5)
assert_eq!(Some(3), recv.next().await);
assert_eq!(Some(5), recv.next().await); // Panics here: 4 happens to be in the despite the timeout
assert_eq!(None, recv.next().await);
Ok(())
}
Playground link: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e72c947af414fc813345d71533065414