异步和并发
基本概念
Send & Sync & 'static
在 Rust 中,Send 和 Sync 是两个非常重要的 trait,它们定义了类型在并发环境下的安全性。理解这两个 trait 的含义和应用是进行 Rust 并发编程的关键。
Send 标记 trait 表示一个类型的值可以安全地从一个线程转移(move)到另一个线程。如果一个类型 T 实现了 Send,那么一个该类型的变量可以被移动到另一个线程中去使用。大多数 Rust 的类型都是 Send。不是 Send 的例子包括 Rc<T>(引用计数智能指针),因为 Rc<T> 不是线程安全的,多个线程增减引用计数可能导致数据竞争。Arc<T> 是一个线程安全的引用计数智能指针,它可以在多线程中共享,因为内部使用原子操作来管理引用计数。
Sync 标记 trait 表示一个类型的引用(&T)可以安全地被多个线程同时访问。如果类型 T 是 Sync 的,意味着 &T 是线程安全的。这通常意味着在内部没有可变状态,或者类型使用内部锁(如 Mutex 或 RwLock)来管理内部状态的访问。不是 Sync 的例子包括 RefCell<T> 和 Cell<T>,因为它们允许在运行时改变其内部值,而不是通过锁或其他线程安全的机制来保护其状态。
Sync 和Send的关系很微妙,Sync可以理解为是Send的辅助之一:一个类型T是Sync 当且仅当&T是Send。
多线程程序还要注意保证被共享的数据在被访问的时候是有效的。如果访问无效的数据,比如Use-after-free,那么就会有Undefined Behavior。所以Rust规定如果数据被用于多线程中,要么线程数据被move到线程中;要么数据具有'static的生命周期。对C语言熟悉的同学,一定第一时间想到了静态变量,存储在程序的数据段,在程序的整个生命周期都是有效的。这确实是一种情况,但是'static的含义不止如此。被'static修饰的引用,在程序剩余的生命周期一直是有效的即可,所以可以在运行时创建。
当'static用来修饰trait的时候,表示 实现trait的类型 不含任何非static的引用。所以这个类型可以被一直持有,永远不会失效。下面这个例子,来自Rust By Example,非常好:
use std::fmt::Debug;
fn print_it( input: impl Debug + 'static ) {
println!( "'static value passed in is: {:?}", input );
}
fn main() {
// i is owned and contains no references, thus it's 'static:
let i = 5;
print_it(i);
// oops, &i only has the lifetime defined by the scope of
// main(), so it's not 'static:
print_it(&i);
}
在多线程中传递dyn Trait的时候,我们经常会看到类似下面两种写法,使用Send + Sync + 'static约束Trait。因为dyn Trait 会擦除具体类型信息,因此编译器需要通过 trait 约束来保证安全性。
let object: Box<dyn Trait + Send + Sync + 'static> = Box::new(MyType);
trait Trait: Send + Sync + 'static {}
let object: Box<dyn Trait> = Box::new(MyType);
Future & Poll
异步编程是一项重要功能,它允许程序在等待某些操作完成(例如 I/O 操作)时继续执行其他任务。Rust 的异步编程模型建立在 Future trait 和 Poll 类型上。在 Rust 中,Future 是一个 trait,代表了一个异步操作的最终完成结果。Future 可以被视为一个可能还没有完成的计算值。当一个 Future 被 polled 时,它会尝试推进其内部状态,如果操作完成则返回结果,如果未完成则表示仍然在等待。
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
这里的关键点是:
Output:这是Future完成后返回的类型。poll方法:这是Future被调用以尝试向前推进其状态的方法。它接收一个Context参数,该参数提供了Waker,用于在Future准备好再次被 polled 时通知执行器。
Poll 是一个枚举,定义在 std::task 模块中,用于表示 Future 的完成状态:
Ready(T):表示Future已经完成,携带着最终结果T。Pending:表示Future尚未完成,需要在未来某个时刻再次尝试。
async & await
async 和 await 是用于简化异步编程的关键关键字。它们让编写异步代码几乎像写同步代码一样简单,并在底层自动处理 Future 和 Poll 的复杂性。
async 关键字用于定义一个异步函数或异步块。当你在函数前加上 async 关键字时,这个函数会返回一个实现了 Future trait 的类型。这意味着,这个函数本身不会立即执行,而是返回一个将在未来某个时刻完成的 Future。当你使用 async 关键字定义一个函数,Rust 的编译器将这个函数转换成一个状态机。这个状态机背后实际上是一个实现了 Future trait 的类型。每当这个 Future 的 poll 方法被调用时,它会根据内部状态决定是继续执行、暂停还是完成。
await 关键字用于在异步函数或异步块内部“等待”另一个 Future 的完成。它只能在 async 块或函数中使用。使用 await 可以暂停当前函数的执行,直到 Future 完成为止,不会阻塞线程。每次使用 await 操作符时,如果被等待的 Future 尚未完成(即返回 Poll::Pending),当前 Future 也会返回 Poll::Pending 并将控制权交回给执行器(yeild)。这允许执行器继续处理其他任务。当被等待的 Future 准备好继续执行时,执行器会再次 poll 当前 Future。
Pin & Unpin
Pin的出现主要是为了解决 自引用类型 问题。试想,如果有一个对象保存了自己的地址0x20,那么当他被移动到0x40的时候,0x20就变成一个悬空指针。我们一般情况不会写出 自引用类型 的数据结构。但是,Future 就可能是。(此外,Linux内核常用的侵入式双链表如果想在Rust实现,也需要Pin)
Pin他内部包裹了另外一个指针P,并且只要P指针指向的内容(我们称为T)没有实现Unpin,则可以保证T永远不会被移动(move)。本质上,Pin 可以认为是一个标记,向编译器保证“如果我指向的对象(T)不能在内存中自由的移动(!Unpin),那么他就肯定不会在内存中被移动”。
Pin 通过阻止 &mut T 到 &mut U 的转换来确保这一点,因为通常这种转换允许通过替换值来移动数据。
Unpin 是一个自动实现的 trait,当一个类型实现了 Unpin trait,表示它的实例可以安全地被“移动”,即使它已被固定。大多数 Rust 的内置类型都实现了 Unpin,但如果一个类型包含不可移动的字段(如通过 Pin 包装的字段),则该类型可能不会自动实现 Unpin。
看一下pin_mut宏的实现,很有意思。首先shadow原本的变量,然后new_unchecked。就完事了。凭什么对象就被Pin住了呢?因为原本的对象已经无法访问了,再多加了一个Pin之后只是多了一个编译器的标记,阻止 &mut T 到 &mut U 的转换。所以是安全的。这是将值Pin在栈上的方法。Pin在堆上,可以使用Box::pin。
#[macro_export]
macro_rules! pin_mut {
($($x:ident),* $(,)?) => { $(
// Move the value to ensure that it is owned
let mut $x = $x;
// Shadow the original binding so that it can't be directly accessed
// ever again.
#[allow(unused_mut)]
let mut $x = unsafe {
$crate::core_reexport::pin::Pin::new_unchecked(&mut $x)
};
)* }
}
Executor
Executor是真正负责调度执行Future的东西。例如
- futures::executor::block_on 是最简单的Executor,阻塞线程直到Future执行完成。
Join & Select
如果只是单纯的将普通函数变成异步函数,那么实际上代码的并发性和之前没有任何区别,仍然是顺序执行。要提高代码并发性的关键是要让 Futures 可以一起执行。
Join代表的执行关系就是 Futures 可以同时开始执行,然后等待他们全部完成后返回。为此,futures提供了两个宏join和try_join,后者如果 Futures 中有一个返回 Err ,则立刻返回Err。
// `join!` is variadic, so you can pass any number of futures
let c = async { 3 };
let d = async { 4 };
let e = async { 5 };
assert_eq!(join!(c, d, e), (3, 4, 5));
Select代表的执行关系是 Futures 可以 同时开始执行,有一个执行完成后立刻返回。为此,futures提供了宏select。它的用法会更复杂一点。
除了这两种执行关系,还有
- 两个Future同时开始执行,但是彼此之间毫无关系。比如我们在处理完数据之后,一边将数据进行缓存,一边进行进一步处理。这个时候可以单独开一个线程执行缓存操作。
Stream
Stream 就是一个异步的 Iterator。更准确的说法如下:
If
Future<Output = T>is an asynchronous version ofT, thenStream<Item = T>is an asynchronous version ofIterator<Item = T>
在使用Stream有一个很坑的地方,因为next 方法不会使用Stream的所有权,所以Stream需要同时满足Unpin,
基础设施
要完整的使用 async 异步编程,你需要依赖以下特性和外部库:
- 关键字
async/await由 Rust 语言提供,并进行了编译器层面的支持 - 所必须的特征(例如
Future)、类型和函数,由标准库提供实现 - 众多实用的类型、宏和函数由官方开发的
futures包提供(不是标准库),它们可以用于任何async应用中,但是不包括Executor(不算block_on)! async代码的执行、IO操作、任务创建和调度等等复杂功能由社区的async运行时提供,例如tokio和async-std。
futures
futures 库是一个非常重要的异步编程工具,提供了丰富的功能来支持异步编程。
- 组合器: futures库提供了多种方法来组合和链接Future。例如,
join允许同时运行多个Future并等待它们全部完成。 - 选择器:
select!宏允许同时等待多个Future,并在任何一个完成时执行相应的操作。 - Stream:类似于同步编程中的
Iterator,但是用于异步场景。它代表一系列可以异步获取的值。Stream trait定义了next方法,返回一个Future,这个Future解析为Option<Item>。 - Sink代表一个可以异步接收值的目标。它可以看作是Stream的对立面:Stream产生值,而Sink消费值。