leevis.com icon indicating copy to clipboard operation
leevis.com copied to clipboard

rust异步编程

Open vislee opened this issue 1 year ago • 0 comments

概念

并行:多个cpu并行的执行多个任务。 并发: 一个cpu交替的执行多个任务。

CPU密集型:占用cpu资源的任务。 IO密集型:占用IO资源的任务。IO密集型的任务使用并发异步编程可以更好的利用CPU提升性能。

简单的rust多线程程序:

use std::thread;
use std::thread::sleep;
use std::time::Duration;

fn main() {
    println!("===0====");
    let h1 = thread::spawn(|| {
        let s = test_1();
        println!("===1=== {}", s);
    });

    let h2 = thread::spawn(|| {
        let s = test_2();
        println!("===2=== {}", s);
    });

    h1.join().unwrap();
    h2.join().unwrap();

    println!("done")
}

fn test_1() -> String {
    sleep(Duration::from_secs(2));
    "hello test 1".to_string()
}

fn test_2() -> String {
    sleep(Duration::from_secs(1));
    "hello test 2".to_string()
}

使用tokio重构上述代码, 首先编辑项目的Cargo.toml文件,在[dependencies]下面加上依赖tokio,如下:tokio = {version = "1", features = ["full"]}

use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    println!("===0====");
    let h1 = tokio::spawn(async {
        let s = test_1().await;
        println!("===1=== {}", s);
    });

    let h2 = tokio::spawn(async {
        let s = test_2().await;
        println!("===2=== {}", s);
    });

    let _ = tokio::join!(h1, h2);

    println!("done")
}

async fn test_1() -> String {
    sleep(Duration::from_secs(2)).await;
    "hello test 1".to_string()
}

async fn test_2() -> String {
    sleep(Duration::from_secs(1)).await;
    "hello test 2".to_string()
}

async

异步编程, cpu上的任务再等待外部事件时候,让出cpu安排其他任务执行。 async.await 关键字是rust标准库里用于异步编程的核心,async函数会返回Future是rust异步的核心,Future是trait可以用来表示各种事件,该trait需要实现poll函数。下面的实现和上面代码的实现是等价的,都会返回Future。

fn test_3() -> impl Future<Output = String> {
    async {
        sleep(Duration::from_secs(3)).await;
        "hello test 3".to_string()
    }
}

实现Future

既然async返回了Future,那我们手动实现一个Future,看看poll方法是怎么被调用的,异步是如何实现的。 根据文档,我们可以实现如下:

struct Test4 {}
impl Future for Test4 {
    type Output = String;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        println!("poll ...");
        std::task::Poll::Pending
    }
}

当在使用上述tokio的例子中测试该Future时,打印一次 poll 方法的 poll... 以后程序hang住了。

let h4 = tokio::spawn(async {
        let f = Test4{};
        let s = f.await;
        println!("===4=== {}", s);
    });

    let _ = tokio::join!(h1, h2, h4);

Future文档大概说了,异步执行器调用过poll后,任务没有结束就会注册到Waker,Waker有一个处理程序会被存在任务关联的Context对象中。 Waker的wake()方法,用来告诉异步执行器关联的任务应该被唤醒再次poll这个异步任务。 修改poll方法让其注册到Wake:

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        println!("poll ...");
        cx.waker().wake_by_ref();
        std::task::Poll::Pending
    }

再次执行时发现一直输出poll... 为什么呢??? 因为cx.waker().wake_by_ref(); 一直通知准备好了可以调用poll了,而实际还没准备好std::task::Poll::Pending。 所以,只需要把最后Pending改为 std::task::Poll::Ready("hello test 4".to_string())就好了。 看下最终的代码:

use std::future::Future;
use tokio::time::{sleep, Duration};

struct Test4 {}
impl Future for Test4 {
    type Output = String;

    fn poll(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        println!("poll ...");
        cx.waker().wake_by_ref();
        // std::task::Poll::Pending
        std::task::Poll::Ready("hello test 4".to_string())
    }
}

#[tokio::main]
async fn main() {
    println!("===0====");
    let h1 = tokio::spawn(async {
        let s = test_1().await;
        println!("===1=== {}", s);
    });

    let h2 = tokio::spawn(async {
        let s = test_2().await;
        println!("===2=== {}", s);
    });

    let h3 = tokio::spawn(async {
        let s = test_3().await;
        println!("===3=== {}", s);
    });

    let h4 = tokio::spawn(async {
        let f = Test4 {};
        let s = f.await;
        println!("===4=== {}", s);
    });

    let _ = tokio::join!(h1, h2, h3, h4);

    println!("done")
}

async fn test_1() -> String {
    sleep(Duration::from_secs(2)).await;
    "hello test 1".to_string()
}

async fn test_2() -> String {
    sleep(Duration::from_secs(1)).await;
    "hello test 2".to_string()
}

fn test_3() -> impl Future<Output = String> {
    async {
        sleep(Duration::from_secs(3)).await;
        "hello test 3".to_string()
    }
}

$ cargo run
   Compiling tsync v0.1.0 (/Users/vislee/Work/test)
    Finished dev [unoptimized + debuginfo] target(s) in 1.09s
     Running `target/debug/test`
===0====
poll ...
===4=== hello test 4
===2=== hello test 2
===1=== hello test 1
===3=== hello test 3
done

tokio

tokio技术栈

  • Runtime: Tokio运行时包括I/O、定时器、文件系统、同步和调度设施,是异步应用的基础。
  • Hyper: Hyper是一个HTTP客户端和服务器库,同时支持HTTP 1和2协议。
  • Tonic: 一个无固定规则(boilerplate-free)的gRPC客户端和服务器库。通过网络发布和使用API的最简单方法。
  • Tower:用于建立可靠的客户端和服务器的模块化组件。包括重试、负载平衡、过滤、请求限制设施等。
  • Mio:在操作系统的事件化I/O API之上的最小的可移植API。
  • Tracing: 对应用程序和库的统一的洞察力。提供结构化的、基于事件的数据收集和记录。
  • Bytes:在核心部分,网络应用程序操纵字节流。Bytes提供了一套丰富的实用程序来操作字节数组。

tokio术语

  • 异步 async/await语言特性的代码
  • 并发和并行
  • Future trait poll方法
  • 执行器或调度器 通过重复调用Future的poll方法。通过在等待时交换当前运行的任务能够在几个线程上并发地运行大量的 future。
  • 运行时 包含执行器以及与该执行器集成的各种实用工具的库。
  • 任务 是在Tokio运行时上运行的操作,由 tokio::spawn或 Runtime::block_on 函数创建。
  • 异步块
  • 异步函数
  • Yielding future只要从 poll 方法中返回就会让出(Yielding)。
  • 阻塞 阻塞线程
  • Stream 是 Iterator 的异步版本,它提供了一个数值流。
  • Channel 允许代码的一个部分向其他部分发送消息。Tokio提供了许多通道:
    • mpsc:多生产者、单消费者通道。可以发送许多值。
    • oneshot:单生产者,单消费者通道。可以发送一个单一的值。
    • broadcast:多生产者,多消费者。可以发送许多值。每个接收者看到每个值。
    • watch:单生产者,多消费者。可以发送许多值,但不保留历史。接收者只看到最新的值。
  • Backpressure 用于控制进入系统的数据量,以防止系统过载。实际上也就是channel阻塞。

关键词

  • #[tokio::main] 函数是一个宏,它将 async fn main() 转换为同步 fn main(),初始化一个运行时实例并执行异步main函数。
#[tokio::main]
async fn main() {
    ...
}
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        ...
    })
}

tokio I/O

与 std::io::Read和std::io::Write大致相同,但是是异步的。 AsyncRead AsyncWrite

tokio库中有关键的字tokio::spawnawait 关键字,什么时候该用spawn?什么时候该用await? gemini给了个说明:

在 Rust 的 Tokio 库中,tokio::spawn 和 await 关键字是用于管理异步任务的关键工具。它们的使用时机取决于您要执行的操作和异步任务的性质。

什么时候使用 tokio::spawn? 启动新的异步任务:tokio::spawn 用于将异步任务调度到 Tokio 的运行时环境中。它接收一个闭包作为参数,该闭包包含要执行的异步操作。

将阻塞操作转换为异步:如果您需要在异步任务中执行阻塞操作,例如读取文件或进行网络请求,可以使用 tokio::spawn 将其包装在异步闭包中。

分离控制流:如果您希望在异步任务完成之前继续执行其他代码,可以使用 tokio::spawn 将异步任务分离出来。

什么时候使用 await?

等待异步任务完成:await 关键字用于等待异步任务完成。它只能在异步闭包或其他异步上下文中使用。

阻塞当前线程:当您使用 await 等待异步任务完成时,当前线程将被阻塞,直到任务完成。这意味着其他异步任务无法在此期间运行。

串联异步任务:您可以使用 await 将多个异步任务串联在一起,以确保它们按顺序执行。

以下是一些使用 tokio::spawn 和 await 的示例:

示例 1:启动一个简单的异步任务

use tokio::spawn;

async fn hello() {
    println!("Hello, world!");
}

fn main() {
    spawn(hello());
}

示例 2:将阻塞操作转换为异步

use tokio::spawn;
use std::fs::File;
use tokio::io::AsyncReadExt;

async fn read_file(path: &str) -> String {
    let mut file = File::open(path).unwrap();
    let mut contents = String::new();
    file.read_to_string(&mut contents).unwrap();
    contents
}

fn main() {
    let future = spawn(read_file("data.txt"));
    let contents = future.await.unwrap();
    println!("File contents: {}", contents);
}

示例 3:串联异步任务

use tokio::spawn;

async fn task1() {
    println!("Task 1 started");
    await!(tokio::time::delay(Duration::from_secs(1)));
    println!("Task 1 completed");
}

async fn task2() {
    println!("Task 2 started");
    await!(tokio::time::delay(Duration::from_secs(2)));
    println!("Task 2 completed");
}

fn main() {
    let task1_future = spawn(task1());
    let task2_future = spawn(task2());

    // 等待 task1 完成
    task1_future.await.unwrap();

    // 等待 task2 完成
    task2_future.await.unwrap();
}

总而言之,tokio::spawn 用于启动新的异步任务或将阻塞操作转换为异步,而 await 用于等待异步任务完成或阻塞当前线程。通过结合使用这两个工具,您可以构建高效且可扩展的异步应用程序。

vislee avatar Mar 06 '24 15:03 vislee