futures-concurrency
futures-concurrency copied to clipboard
Add `Race::race_with_index`
I think the following race_with_index function would be very useful:
pub trait Race {
type Output;
type Future: Future<Output = Self::Output>;
fn race(self) -> Self::Future;
// New method:
fn race_with_index(self) -> impl Future<Output = (usize, Self::Output)>;
}
Usage:
let (i, result) = (fut_1, fut_2).race().await;
println!("Future {i} won the race with result {result}");
A similar function should probably also be add to RaceOk.
@tyilo
A common pattern for this is to wrap the futures:
let (i, result) = (
async { (0, fut1.await) }, //
async { (1, fut2.await) }, //
)
.race()
.await;
println!("Future {i} won the race with result {result}");
Moreover, instead of an index, it's better to return something more appropriate for the application domain, such as an enum.
Yes, I agree for that specific example.
My argument would be that if you have a Vec<impl Future> you can use race_with_index without having to allocate a new Vec to get the index.
Here is another example (which needs to map the Vec, so probably not a great example):
use std::time::Duration;
use futures_concurrency::prelude::*;
use tokio::{sync::watch, time::timeout_at};
// Imagine this actually does something useful
fn watch(_id: &str) -> watch::Receiver<u64> {
let (tx, rx) = watch::channel(0);
tokio::task::spawn(async move {
for i in 164.. {
if tx.send(i).is_err() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
rx
}
async fn f(mut rxs: Vec<watch::Receiver<u64>>) -> Vec<Option<u64>> {
let mut states = vec![None; rxs.len()];
let timeout_instant = tokio::time::Instant::now() + Duration::from_secs(1);
loop {
let changed: Vec<_> = rxs
.iter_mut()
.enumerate()
.map(|(i, rx)| async move {
rx.changed().await.unwrap();
(i, *rx.borrow_and_update())
})
.collect();
match timeout_at(timeout_instant, changed.race()).await {
Ok((i, v)) => {
states[i] = Some(v);
}
Err(_) => break,
}
}
states
}
#[tokio::main]
async fn main() {
const N: usize = 3;
let rxs: Vec<_> = (0..N).map(|i| watch(&format!("id-{i}"))).collect();
let res = f(rxs).await;
println!("{res:?}");
}
My argument would be that if you have a
Vec<impl Future>you can userace_with_indexwithout having to allocate a newVecto get the index.
Now I understand, that's a good example. I would consider a simplified version of the problem:
// Already allocated vec
let futures: Vec<impl Future> = vec![..];
// Here you need to reallocate vec
let futures_with_indices: Vec<impl Future> = futures
.into_iter()
.enumerate()
.map(|(i, fut)| async move { (i, fut.await) })
.collect();
let (index, result) = futures_with_indices.race().await;
But here the problem arises: why can't we just make a race from an iterator? What if the library had such a feature?
let futures: Vec<impl Future> = vec![..];
let futures_with_indices: _ = futures
.into_iter()
.enumerate()
.map(|(i, fut)| async move { (i, fut.await) });
let (index, result) = future::race_from_iter(futures_with_indices).await;
Although Race would still need to collect a vector of futures internally, this could be much more efficient, especially since it allows calling .into_future() on each future without an additional reallocation.
It would also be possible to create a convenient extension trait similar to FutureExt, but for iterators:
pub trait IteratorExt: Iterator {
fn join(self) -> JoinIterator<Self>
where
Self: Sized;
fn race(self) -> RaceIterator<Self>
where
Self: Sized;
}
Then it would be possible to call race directly on the iterator:
let futures: Vec<impl Future> = vec![..];
let (index, result) = futures
.into_iter()
.enumerate()
.map(|(i, fut)| async move { (i, fut.await) })
.race()
.await;
Of course, in this case it doesn't fully solve the original problem. In the end, reallocation will still be required, but just hidden. However, it opens up the possibility of writing more efficient code, for example, by directly returning an iterator when creating a list of futures.
Moreover, such an API is more general and could be useful in certain specific cases, not just for indexing. Although race_with_index could also be potentially useful, it might be worth proposing an API for the iterator separately from this issue 🤔