跳转至

异步和并发

基本概念

Send & Sync

在 Rust 中,SendSync 是两个非常重要的 trait,它们定义了类型在并发环境下的安全性。理解这两个 trait 的含义和应用是进行 Rust 并发编程的关键。

Send 标记 trait 表示一个类型的值可以安全地从一个线程转移到另一个线程。如果一个类型 T 实现了 Send,那么一个该类型的变量可以被移动到另一个线程中去使用。大多数 Rust 的类型都是 Send。不是 Send 的例子包括 Rc<T>(引用计数智能指针),因为 Rc<T> 不是线程安全的,多个线程增减引用计数可能导致数据竞争。Arc<T> 是一个线程安全的引用计数智能指针,它可以在多线程中共享,因为内部使用原子操作来管理引用计数。r

Sync 标记 trait 表示一个类型的引用(&T)可以安全地被多个线程同时访问。如果类型 TSync 的,意味着 &T 是线程安全的。这通常意味着在内部没有可变状态,或者类型使用内部锁(如 MutexRwLock)来管理内部状态的访问。不是 Sync 的例子包括 RefCell<T>Cell<T>,因为它们允许在运行时改变其内部值,而不是通过锁或其他线程安全的机制来保护其状态。

他们的实现是编译器自动推理的,除非你使用unsafe

Future & Poll

异步编程是一项重要功能,它允许程序在等待某些操作完成(例如 I/O 操作)时继续执行其他任务。Rust 的异步编程模型建立在 Future trait 和 Poll 类型上。在 Rust 中,Future 是一个 trait,代表了一个异步操作的最终完成结果Future 可以被视为一个可能还没有完成的计算值。当一个 Future 被 polled 时,它会尝试推进其内部状态,如果操作完成则返回结果,如果未完成则表示仍然在等待。

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

这里的关键点是:

  • Output:这是 Future 完成后返回的类型。
  • poll 方法:这是 Future 被调用以尝试向前推进其状态的方法。它接收一个 Context 参数,该参数提供了 Waker,用于在 Future 准备好再次被 polled 时通知执行器。

Poll 是一个枚举,定义在 std::task 模块中,用于表示 Future 的完成状态:

pub enum Poll<T> {
    Ready(T),
    Pending,
}
  • Ready(T):表示 Future 已经完成,携带着最终结果 T
  • Pending:表示 Future 尚未完成,需要在未来某个时刻再次尝试。

async & await

asyncawait 是用于简化异步编程的关键关键字。它们让编写异步代码几乎像写同步代码一样简单,并在底层自动处理 FuturePoll 的复杂性。

async 关键字用于定义一个异步函数或异步块。当你在函数前加上 async 关键字时,这个函数会返回一个实现了 Future trait 的类型。这意味着,这个函数本身不会立即执行,而是返回一个将在未来某个时刻完成的 Future。当你使用 async 关键字定义一个函数,Rust 的编译器将这个函数转换成一个状态机。这个状态机背后实际上是一个实现了 Future trait 的类型。每当这个 Futurepoll 方法被调用时,它会根据内部状态决定是继续执行、暂停还是完成。

await 关键字用于在异步函数或异步块内部“等待”另一个 Future 的完成。它只能在 async 块或函数中使用。使用 await 可以暂停当前函数的执行,直到 Future 完成为止,不会阻塞线程。每次使用 await 操作符时,如果被等待的 Future 尚未完成(即返回 Poll::Pending),当前 Future 也会返回 Poll::Pending将控制权交回给执行器(yeild)。这允许执行器继续处理其他任务。当被等待的 Future 准备好继续执行时,执行器会再次 poll 当前 Future

Pin & Unpin

Pin的出现主要是为了解决 自引用类型 问题。试想,如果有一个对象保存了自己的地址0x20,那么当他被移动到0x40的时候,0x20就变成一个悬空指针。我们一般情况不会写出 自引用类型 的数据结构。但是,Future 就可能是。(此外,Linux内核常用的侵入式双链表如果想在Rust实现,也需要Pin

Pin他内部包裹了另外一个指针P,并且只要P指针指向的内容(我们称为T)没有实现Unpin,则可以保证T永远不会被移动(move)。本质上,Pin 可以认为是一个标记,向编译器保证“如果我指向的对象(T)不能在内存中自由的移动(!Unpin),那么他就肯定不会在内存中被移动”。

Pin 通过阻止 &mut T&mut U 的转换来确保这一点,因为通常这种转换允许通过替换值来移动数据。

Unpin 是一个自动实现的 trait,当一个类型实现了 Unpin trait,表示它的实例可以安全地被“移动”,即使它已被固定。大多数 Rust 的内置类型都实现了 Unpin,但如果一个类型包含不可移动的字段(如通过 Pin 包装的字段),则该类型可能不会自动实现 Unpin

看一下pin_mut宏的实现,很有意思。首先shadow原本的变量,然后new_unchecked。就完事了。凭什么对象就被Pin住了呢?因为原本的对象已经无法访问了,再多加了一个Pin之后只是多了一个编译器的标记,阻止 &mut T&mut U 的转换。所以是安全的。这是将值Pin在栈上的方法。Pin在堆上,可以使用Box::pin

#[macro_export]
macro_rules! pin_mut {
    ($($x:ident),* $(,)?) => { $(
        // Move the value to ensure that it is owned
        let mut $x = $x;
        // Shadow the original binding so that it can't be directly accessed
        // ever again.
        #[allow(unused_mut)]
        let mut $x = unsafe {
            $crate::core_reexport::pin::Pin::new_unchecked(&mut $x)
        };
    )* }
}

Executor

Executor是真正负责调度执行Future的东西。例如

Join & Select

如果只是单纯的将普通函数变成异步函数,那么实际上代码的并发性和之前没有任何区别,仍然是顺序执行。要提高代码并发性的关键是要让 Futures 可以一起执行

Join代表的执行关系就是 Futures 可以同时开始执行,然后等待他们全部完成后返回。为此,futures提供了两个宏jointry_join,后者如果 Futures 中有一个返回 Err ,则立刻返回Err。

// `join!` is variadic, so you can pass any number of futures
let c = async { 3 };
let d = async { 4 };
let e = async { 5 };
assert_eq!(join!(c, d, e), (3, 4, 5));

Select代表的执行关系是 Futures 可以 同时开始执行,有一个执行完成后立刻返回。为此,futures提供了宏select。它的用法会更复杂一点。

除了这两种执行关系,还有

  • 两个Future同时开始执行,但是彼此之间毫无关系。比如我们在处理完数据之后,一边将数据进行缓存,一边进行进一步处理。这个时候可以单独开一个线程执行缓存操作。

Stream

Stream 就是一个异步的 Iterator。更准确的说法如下:

If Future<Output = T> is an asynchronous version of T, then Stream<Item = T> is an asynchronous version of Iterator<Item = T>

在使用Stream有一个很坑的地方,因为next 方法不会使用Stream的所有权,所以Stream需要同时满足Unpin

基础设施

要完整的使用 async 异步编程,你需要依赖以下特性和外部库:

  • 关键字 async/await 由 Rust 语言提供,并进行了编译器层面的支持
  • 所必须的特征(例如 Future )、类型和函数,由标准库提供实现
  • 众多实用的类型、宏和函数由官方开发的 futures 包提供(不是标准库),它们可以用于任何 async 应用中,但是不包括Executor(不算block_on)!
  • async 代码的执行、IO 操作、任务创建和调度等等复杂功能由社区的 async 运行时提供,例如 tokioasync-std

futures

futures 库是一个非常重要的异步编程工具,提供了丰富的功能来支持异步编程。

  1. 组合器: futures库提供了多种方法来组合和链接Future。例如,join允许同时运行多个Future并等待它们全部完成。
  2. 选择器: select!宏允许同时等待多个Future,并在任何一个完成时执行相应的操作。
  3. Stream:类似于同步编程中的Iterator,但是用于异步场景。它代表一系列可以异步获取的值。Stream trait定义了next方法,返回一个Future,这个Future解析为Option<Item>
  4. Sink代表一个可以异步接收值的目标。它可以看作是Stream的对立面:Stream产生值,而Sink消费值。

tokio

Tokio是Rust生态系统中最流行的异步运行时库之一。它为构建可靠、高效的网络应用程序提供了基础设施。

代码