futures-concurrency
futures-concurrency copied to clipboard
ConcurrentStream usage with tokio leads to ACCESS_VIOLATION
Tried to use concurrent streams to sleep in parallel with tokio:
use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let s1 = stream::iter([1, 2]);
let s2 = stream::iter([1, 2]);
let s3 = stream::iter([1, 2]);
(s1, s2, s3)
.chain()
.co()
.map(|x| async move {
tokio::time::sleep(Duration::from_secs(x as _)).await;
()
})
.for_each(|_| async {})
.await;
}
But sometimes I get crash:
> cargo run
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.15s
Running `target\debug\rstestss.exe`
error: process didn't exit successfully: `target\debug\rstestss.exe` (exit code: 0xc0000005, STATUS_ACCESS_VIOLATION)
Without "current_thread" flavor program just freeze
Other runtimes work fine: async_std
use core::time::Duration;
use async_std;
use futures_concurrency::prelude::*;
use futures::prelude::*;
#[async_std::main]
async fn main() {
let s1 = stream::iter([1, 2]);
let s2 = stream::iter([1, 2]);
let s3 = stream::iter([1, 2]);
(s1, s2, s3)
.chain()
.co()
.map(|x| async move {
async_std::task::sleep(Duration::from_secs(x)).await;
()
})
.for_each(|_| async {})
.await;
}
smol
use core::time::Duration;
use futures_concurrency::prelude::*;
use futures::prelude::*;
async fn main_async() {
let s1 = stream::iter([1, 2]);
let s2 = stream::iter([1, 2]);
let s3 = stream::iter([1, 2]);
(s1, s2, s3)
.chain()
.co()
.map(|x| async move {
smol::Timer::after(Duration::from_secs(x)).await;
()
})
.for_each(|_| async {})
.await;
}
fn main() {
smol::block_on(main_async());
}
Also tokio runtime with smol Timer works fine:
use core::time::Duration;
use futures_concurrency::prelude::*;
use tokio;
use futures::prelude::*;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let s1 = stream::iter([1, 2]);
let s2 = stream::iter([1, 2]);
let s3 = stream::iter([1, 2]);
(s1, s2, s3)
.chain()
.co()
.map(|x| async move {
// tokio::time::sleep(Duration::from_secs(x as _)).await;
smol::Timer::after(Duration::from_secs(x)).await;
()
})
.for_each(|_| async {})
.await;
}
Is that Tokio issue?
Interesting, thanks for reporting it
It seems that you're running it on Windows, right? - I only have Linux available, and I didn't find the ACCESS_VIOLATION
error - but it freezes on every run (while the others runtimes run as you pointed out). @yoshuawuyts you run on Windows, right?
I'll investigate some more, but my initial guess is that it is probably a problem coming from tokio
From https://github.com/rust-lang/futures-rs/issues/2851#issuecomment-2066826865:
I don't intend to review everything of it, but it has bunch of very suspicious unsafe codes even at a cursory glance: This would be unsound when vec, the storage of slab, is reallocated. This probably has the same problem as tokio-rs/tokio#2612. The heavy use of ManuallyDrop around Pin API reminds me of async-rs/async-std#903. etc.
IIRC, the tokio timer is !Unpin
and the others are Unpin
, so it is probably only the tokio timer that is affected by the first unsoundness in them.
I also noticed the unsoundness when looking through the code just now. I have put together a minimal test repro for the pin unsoundness in this ConcurrentStream vec collect impl.
use std::{future::Future, marker::PhantomPinned, pin::pin, task::Poll};
use futures_concurrency::{
concurrent_stream::{ConcurrentStream, Consumer, ConsumerState},
future::Race,
};
use futures_executor::block_on;
use pin_project::pin_project;
#[pin_project]
struct PinCheck {
addr: usize,
#[pin]
_pinned: PhantomPinned,
}
impl PinCheck {
fn new() -> Self {
Self {
addr: 0,
_pinned: PhantomPinned,
}
}
}
impl Future for PinCheck {
type Output = ();
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
let this = self.project();
let addr = this.addr as *mut usize as usize;
if *this.addr == 0 {
cx.waker().wake_by_ref();
*this.addr = addr;
Poll::Pending
} else {
assert_eq!(*this.addr, addr, "pinned value was moved.");
Poll::Ready(())
}
}
}
struct Tricky;
impl ConcurrentStream for Tricky {
type Item = ();
type Future = PinCheck;
async fn drive<C>(self, consumer: C) -> C::Output
where
C: Consumer<Self::Item, Self::Future>,
{
let mut consumer = pin!(consumer);
for _ in 0..64 {
match consumer.as_mut().send(PinCheck::new()).await {
ConsumerState::Break => return consumer.flush().await,
ConsumerState::Continue => continue,
ConsumerState::Empty => unreachable!(),
}
}
let progress = async { Some(consumer.as_mut().progress().await) };
let noop = async { None };
// poll progress once.
assert!((progress, noop).race().await.is_none());
// push new entry, reallocs internal futures slab.
// this moves the futures and violates pinning.
consumer.as_mut().send(PinCheck::new()).await;
consumer.flush().await
}
fn concurrency_limit(&self) -> Option<std::num::NonZeroUsize> {
todo!()
}
}
#[test]
fn it_works() {
block_on(async { Tricky.collect::<Vec<()>>().await });
}
this currently outputs
thread 'tests::it_works' panicked at src/lib.rs:49:17:
assertion `left == right` failed: pinned value was moved.
left: 5830115848
right: 5830117896
I've been out on sick leave for the past several days. Just wanted to quickly acknowledge this is indeed an issue we should fix.
I want to thank @inklesspen1rus for reporting this, and I wanted to thank everyone else in this thread helping narrow this issue down.