futures-rs icon indicating copy to clipboard operation
futures-rs copied to clipboard

Issue regarding `async` with multithreading

Open jxuanli opened this issue 3 years ago • 1 comments

Hello, I am having a bit of trouble with multithreading when I am calling async functions, an simplified example is as follows:

#![allow(unused_mut)]

use tokio;
use futures::{stream, StreamExt};

async fn changer(s: Vec<String>) -> String {
    s[0].replace("a", "A").to_owned()
}

async fn caller(vec: Vec<Vec<String>>) -> Vec<String> {
    let threads = stream::iter(vec).map(|x| {
        tokio::spawn(async move {
            changer((*x).to_vec()).await
        })
    }).buffer_unordered(10);
    let mut res = Vec::new();
    threads.for_each(|t| async move {
        // res.push(t.unwrap());
        println!("{:?}", t)
    }).await;
    res
}

#[tokio::main]
async fn main()  {
    let now = time::Instant::now();
    let vec = vec![
        vec!["hello".to_string()],
        vec!["world".to_string()],
        vec!["i".to_string()],
        vec!["am".to_string()],
        vec!["a".to_string()],
        vec!["rustacean".to_string()],
    ];
    let res = caller(vec).await;
    println!("{:?}, time elaspsed: {:?}", res, now.elapsed())
}

I want to be able to return the results from threads in caller() instead of printing them. I am able to println! the results. However, rust wouldn't compile when I tried to push the result of each individual thread with the following error

error: captured variable cannot escape `FnMut` closure body
  --> src/main.rs:17:26
   |
16 |       let mut res = Vec::new();
   |           ------- variable defined here
17 |       threads.for_each(|t| async  {
   |  ________________________-_^
   | |                        |
   | |                        inferred to be a `FnMut` closure
18 | |         res.push(t.unwrap());
   | |         --- variable captured here
19 | |         // println!("{:?}", t)
20 | |     }).await;
   | |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

error: could not compile `playground` due to previous error

Multiple attempts by me have been unsuccessful, I am wondering what is the proper way to do this? I just need to be able to store and then return the results after threads returns

jxuanli avatar Jul 10 '22 20:07 jxuanli

/app # cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.03s
     Running `target/debug/ispeak`
["hello", "i", "Am", "rustAceAn", "world", "A"], time elaspsed: 305.497µs

If someone experienced can explain it in proper term, that would be very much appreciated. What's happening is that result of the changer is being "passed" to the closure, making it unavailable from outside of the closures scope. Instead of that, results should be gathered in the "main" thread.

#![allow(unused_mut)]

use std::time; // This was missing
use tokio;
use futures::{stream, StreamExt};

async fn changer(s: Vec<String>) -> String {
    s[0].replace("a", "A").to_owned()
}

async fn caller(vec: Vec<Vec<String>>) -> Vec<String> {
    let mut threads = stream::iter(vec).map(|x| {
        tokio::spawn(async move {
            changer((*x).to_vec()).await
        })
    }).buffer_unordered(10);
    let mut res = Vec::new();

    while let Some(t) = threads.next().await {
        res.push(t.unwrap());
        // println!("{:?}", t)
    }
    res
}

#[tokio::main]
async fn main()  {
    let now = time::Instant::now();
    let vec = vec![
        vec!["hello".to_string()],
        vec!["world".to_string()],
        vec!["i".to_string()],
        vec!["am".to_string()],
        vec!["a".to_string()],
        vec!["rustacean".to_string()],
    ];
    let res = caller(vec).await;
    println!("{:?}, time elaspsed: {:?}", res, now.elapsed())
}

tikhoplav avatar Jul 25 '22 10:07 tikhoplav