futures-rs
futures-rs copied to clipboard
Issue regarding `async` with multithreading
Hello, I am having a bit of trouble with multithreading when I am calling async functions, an simplified example is as follows:
#![allow(unused_mut)]
use tokio;
use futures::{stream, StreamExt};
async fn changer(s: Vec<String>) -> String {
s[0].replace("a", "A").to_owned()
}
async fn caller(vec: Vec<Vec<String>>) -> Vec<String> {
let threads = stream::iter(vec).map(|x| {
tokio::spawn(async move {
changer((*x).to_vec()).await
})
}).buffer_unordered(10);
let mut res = Vec::new();
threads.for_each(|t| async move {
// res.push(t.unwrap());
println!("{:?}", t)
}).await;
res
}
#[tokio::main]
async fn main() {
let now = time::Instant::now();
let vec = vec![
vec!["hello".to_string()],
vec!["world".to_string()],
vec!["i".to_string()],
vec!["am".to_string()],
vec!["a".to_string()],
vec!["rustacean".to_string()],
];
let res = caller(vec).await;
println!("{:?}, time elaspsed: {:?}", res, now.elapsed())
}
I want to be able to return the results from threads in caller() instead of printing them. I am able to println! the results. However, rust wouldn't compile when I tried to push the result of each individual thread with the following error
error: captured variable cannot escape `FnMut` closure body
--> src/main.rs:17:26
|
16 | let mut res = Vec::new();
| ------- variable defined here
17 | threads.for_each(|t| async {
| ________________________-_^
| | |
| | inferred to be a `FnMut` closure
18 | | res.push(t.unwrap());
| | --- variable captured here
19 | | // println!("{:?}", t)
20 | | }).await;
| |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
error: could not compile `playground` due to previous error
Multiple attempts by me have been unsuccessful, I am wondering what is the proper way to do this? I just need to be able to store and then return the results after threads returns
/app # cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.03s
Running `target/debug/ispeak`
["hello", "i", "Am", "rustAceAn", "world", "A"], time elaspsed: 305.497µs
If someone experienced can explain it in proper term, that would be very much appreciated.
What's happening is that result of the changer is being "passed" to the closure, making it
unavailable from outside of the closures scope. Instead of that, results should be gathered
in the "main" thread.
#![allow(unused_mut)]
use std::time; // This was missing
use tokio;
use futures::{stream, StreamExt};
async fn changer(s: Vec<String>) -> String {
s[0].replace("a", "A").to_owned()
}
async fn caller(vec: Vec<Vec<String>>) -> Vec<String> {
let mut threads = stream::iter(vec).map(|x| {
tokio::spawn(async move {
changer((*x).to_vec()).await
})
}).buffer_unordered(10);
let mut res = Vec::new();
while let Some(t) = threads.next().await {
res.push(t.unwrap());
// println!("{:?}", t)
}
res
}
#[tokio::main]
async fn main() {
let now = time::Instant::now();
let vec = vec![
vec!["hello".to_string()],
vec!["world".to_string()],
vec!["i".to_string()],
vec!["am".to_string()],
vec!["a".to_string()],
vec!["rustacean".to_string()],
];
let res = caller(vec).await;
println!("{:?}, time elaspsed: {:?}", res, now.elapsed())
}