异步和并发
基本概念
Send & Sync
在 Rust 中,Send
和 Sync
是两个非常重要的 trait,它们定义了类型在并发环境下的安全性。理解这两个 trait 的含义和应用是进行 Rust 并发编程的关键。
Send
标记 trait 表示一个类型的值可以安全地从一个线程转移到另一个线程。如果一个类型 T
实现了 Send
,那么一个该类型的变量可以被移动到另一个线程中去使用。大多数 Rust 的类型都是 Send
。不是 Send
的例子包括 Rc<T>
(引用计数智能指针),因为 Rc<T>
不是线程安全的,多个线程增减引用计数可能导致数据竞争。Arc<T>
是一个线程安全的引用计数智能指针,它可以在多线程中共享,因为内部使用原子操作来管理引用计数。r
Sync
标记 trait 表示一个类型的引用(&T
)可以安全地被多个线程同时访问。如果类型 T
是 Sync
的,意味着 &T
是线程安全的。这通常意味着在内部没有可变状态,或者类型使用内部锁(如 Mutex
或 RwLock
)来管理内部状态的访问。不是 Sync
的例子包括 RefCell<T>
和 Cell<T>
,因为它们允许在运行时改变其内部值,而不是通过锁或其他线程安全的机制来保护其状态。
他们的实现是编译器自动推理的,除非你使用unsafe
。
Future & Poll
异步编程是一项重要功能,它允许程序在等待某些操作完成(例如 I/O 操作)时继续执行其他任务。Rust 的异步编程模型建立在 Future
trait 和 Poll
类型上。在 Rust 中,Future
是一个 trait,代表了一个异步操作的最终完成结果。Future
可以被视为一个可能还没有完成的计算值。当一个 Future
被 polled 时,它会尝试推进其内部状态,如果操作完成则返回结果,如果未完成则表示仍然在等待。
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
这里的关键点是:
Output
:这是Future
完成后返回的类型。poll
方法:这是Future
被调用以尝试向前推进其状态的方法。它接收一个Context
参数,该参数提供了Waker
,用于在Future
准备好再次被 polled 时通知执行器。
Poll
是一个枚举,定义在 std::task
模块中,用于表示 Future
的完成状态:
Ready(T)
:表示Future
已经完成,携带着最终结果T
。Pending
:表示Future
尚未完成,需要在未来某个时刻再次尝试。
async & await
async
和 await
是用于简化异步编程的关键关键字。它们让编写异步代码几乎像写同步代码一样简单,并在底层自动处理 Future
和 Poll
的复杂性。
async
关键字用于定义一个异步函数或异步块。当你在函数前加上 async
关键字时,这个函数会返回一个实现了 Future
trait 的类型。这意味着,这个函数本身不会立即执行,而是返回一个将在未来某个时刻完成的 Future
。当你使用 async
关键字定义一个函数,Rust 的编译器将这个函数转换成一个状态机。这个状态机背后实际上是一个实现了 Future
trait 的类型。每当这个 Future
的 poll
方法被调用时,它会根据内部状态决定是继续执行、暂停还是完成。
await
关键字用于在异步函数或异步块内部“等待”另一个 Future
的完成。它只能在 async
块或函数中使用。使用 await
可以暂停当前函数的执行,直到 Future
完成为止,不会阻塞线程。每次使用 await
操作符时,如果被等待的 Future
尚未完成(即返回 Poll::Pending
),当前 Future
也会返回 Poll::Pending
并将控制权交回给执行器(yeild)。这允许执行器继续处理其他任务。当被等待的 Future
准备好继续执行时,执行器会再次 poll
当前 Future
。
Pin & Unpin
Pin
的出现主要是为了解决 自引用类型 问题。试想,如果有一个对象保存了自己的地址0x20,那么当他被移动到0x40的时候,0x20就变成一个悬空指针。我们一般情况不会写出 自引用类型 的数据结构。但是,Future
就可能是。(此外,Linux内核常用的侵入式双链表如果想在Rust实现,也需要Pin
)
Pin
他内部包裹了另外一个指针P,并且只要P指针指向的内容(我们称为T)没有实现Unpin,则可以保证T永远不会被移动(move)。本质上,Pin 可以认为是一个标记,向编译器保证“如果我指向的对象(T)不能在内存中自由的移动(!Unpin),那么他就肯定不会在内存中被移动”。
Pin
通过阻止 &mut T
到 &mut U
的转换来确保这一点,因为通常这种转换允许通过替换值来移动数据。
Unpin
是一个自动实现的 trait,当一个类型实现了 Unpin
trait,表示它的实例可以安全地被“移动”,即使它已被固定。大多数 Rust 的内置类型都实现了 Unpin
,但如果一个类型包含不可移动的字段(如通过 Pin
包装的字段),则该类型可能不会自动实现 Unpin
。
看一下pin_mut宏的实现,很有意思。首先shadow原本的变量,然后new_unchecked
。就完事了。凭什么对象就被Pin住了呢?因为原本的对象已经无法访问了,再多加了一个Pin之后只是多了一个编译器的标记,阻止 &mut T
到 &mut U
的转换。所以是安全的。这是将值Pin在栈上的方法。Pin在堆上,可以使用Box::pin
。
#[macro_export]
macro_rules! pin_mut {
($($x:ident),* $(,)?) => { $(
// Move the value to ensure that it is owned
let mut $x = $x;
// Shadow the original binding so that it can't be directly accessed
// ever again.
#[allow(unused_mut)]
let mut $x = unsafe {
$crate::core_reexport::pin::Pin::new_unchecked(&mut $x)
};
)* }
}
Executor
Executor是真正负责调度执行Future的东西。例如
- futures::executor::block_on 是最简单的Executor,阻塞线程直到Future执行完成。
Join & Select
如果只是单纯的将普通函数变成异步函数,那么实际上代码的并发性和之前没有任何区别,仍然是顺序执行。要提高代码并发性的关键是要让 Futures 可以一起执行。
Join
代表的执行关系就是 Futures 可以同时开始执行,然后等待他们全部完成后返回。为此,futures提供了两个宏join
和try_join
,后者如果 Futures 中有一个返回 Err ,则立刻返回Err。
// `join!` is variadic, so you can pass any number of futures
let c = async { 3 };
let d = async { 4 };
let e = async { 5 };
assert_eq!(join!(c, d, e), (3, 4, 5));
Select
代表的执行关系是 Futures 可以 同时开始执行,有一个执行完成后立刻返回。为此,futures提供了宏select
。它的用法会更复杂一点。
除了这两种执行关系,还有
- 两个Future同时开始执行,但是彼此之间毫无关系。比如我们在处理完数据之后,一边将数据进行缓存,一边进行进一步处理。这个时候可以单独开一个线程执行缓存操作。
Stream
Stream
就是一个异步的 Iterator
。更准确的说法如下:
If
Future<Output = T>
is an asynchronous version ofT
, thenStream<Item = T>
is an asynchronous version ofIterator<Item = T>
在使用Stream有一个很坑的地方,因为next
方法不会使用Stream的所有权,所以Stream
需要同时满足Unpin
,
基础设施
要完整的使用 async
异步编程,你需要依赖以下特性和外部库:
- 关键字
async/await
由 Rust 语言提供,并进行了编译器层面的支持 - 所必须的特征(例如
Future
)、类型和函数,由标准库提供实现 - 众多实用的类型、宏和函数由官方开发的
futures
包提供(不是标准库),它们可以用于任何async
应用中,但是不包括Executor(不算block_on
)! async
代码的执行、IO
操作、任务创建和调度等等复杂功能由社区的async
运行时提供,例如tokio
和async-std
。
futures
futures
库是一个非常重要的异步编程工具,提供了丰富的功能来支持异步编程。
- 组合器: futures库提供了多种方法来组合和链接Future。例如,
join
允许同时运行多个Future并等待它们全部完成。 - 选择器:
select!
宏允许同时等待多个Future,并在任何一个完成时执行相应的操作。 - Stream:类似于同步编程中的
Iterator
,但是用于异步场景。它代表一系列可以异步获取的值。Stream trait定义了next
方法,返回一个Future,这个Future解析为Option<Item>
。 - Sink代表一个可以异步接收值的目标。它可以看作是Stream的对立面:Stream产生值,而Sink消费值。
tokio
Tokio是Rust生态系统中最流行的异步运行时库之一。它为构建可靠、高效的网络应用程序提供了基础设施。