解密 Rust 异步编程
Future
Rust 在实现异步编程的时候,采用 Poll 模型。最为核心的就是
enum Poll<T> {Ready(T),Pending,}
我们定义了一个类型为 Poll 处于两种状态之一 Ready Pending
Ready: 我们已经完成了任务可以返回结果 T
Pending:此时我们等待一些其他依赖,这时候我们需要将自己占用的资源释放出来,基于一个
wake()函数再次将自己唤醒
最常见的就是 Socket 编程
pub struct SocketRead<'a> {socket: &'a Socket,}impl SimpleFuture for SocketRead<'_> {type Output = Vec<u8>; //返回值是一个 u8 的 bytesfn poll(&mut self, wake: fn()) -> Poll<Self::Output> {if self.socket.has_data_to_read() {// 如果有数据直接返回 Ready 状态下可读的值Poll::Ready(self.socket.read_buf())} else {// 没有数据就直接返回 Pending,等待数据,这里有一个 wake 函数等待被唤醒self.socket.set_readable_callback(wake);Poll::Pending}}}
对于多个 Pollable 的对象,组合在一起非常的合理,就和 Java 的 ComposeFunture 一样
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>whereFutureA: SimpleFuture<Output = ()>,FutureB: SimpleFuture<Output = ()>,{type Output = ();fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {// 尝试完成 Aif let Some(a) = &mut self.a {if let Poll::Ready(()) = a.poll(wake) {self.a.take();}}// 尝试完成 Bif let Some(b) = &mut self.b {if let Poll::Ready(()) = b.poll(wake) {self.b.take();}}if self.a.is_none() && self.b.is_none() {// 都完成了就是 ReadyPoll::Ready(())} else {Poll::Pending}}}
Wakeup
从上面的设计中可以发现,如果我们没有 wake 的话,我们只能一直的去轮训的获得是否已经 Ready,整体的效率会比较低,因此我们可以通过 wake 函数将其唤醒。Rust 中对于 Wake 的定义如下:
pub trait ArcWake: Send + Sync {fn wake(self: Arc<Self>) {Self::wake_by_ref(&self)}// 对于我们来说,我们需要的就是实现这个 wake_by_reffn wake_by_ref(arc_self: &Arc<Self>);}
让我们假设一个超时的场景,在运行一段时间之后,我们需要将其唤醒。定义一个 TimerFuture 对象
impl TimerFuture {pub fn new(duration: Duration) -> Self {let shared_state = Arc::new(Mutex::new(SharedState {completed: false,waker: None,}));// 启动一个新的线程处理我们的状态量let thread_shared_state = shared_state.clone();thread::spawn(move || {thread::sleep(duration);let mut shared_state = thread_shared_state.lock().unwrap();// Thread Sleep 够了就算是超时了,这时候我们需要 wake 我们的工作对象了shared_state.completed = true;if let Some(waker) = shared_state.waker.take() {waker.wake()}});TimerFuture { shared_state }}}
Executor
上面只是提供了机制,我们还需要执行的本体,Rust 抽象了 Executor 出来
struct Executor {ready_queue: Receiver<Arc<Task>>,}#[derive(Clone)]struct Spawner {task_sender: SyncSender<Arc<Task>>,}impl Spawner {fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {let future = future.boxed();let task = Arc::new(Task {future: Mutex::new(Some(future)),task_sender: self.task_sender.clone(),});self.task_sender.send(task).expect("too many tasks queued");}}impl Executor {fn run(&self) {while let Ok(task) = self.ready_queue.recv() {// 阻塞结束,完成了一部分的请求数据let mut future_slot = task.future.lock().unwrap();if let Some(mut future) = future_slot.take() {// 创建一个 Waker,将这个任务本体放入let waker = waker_ref(&task);let context = &mut Context::from_waker(&*waker);// 尝试poll一次,如果是 Pendingif let Poll::Pending = future.as_mut().poll(context) {*future_slot = Some(future);}}}}}
是不是有点懵,先不慌,我们看看怎么用的。
fn main() {let (executor, spawner) = new_executor_and_spawner();// 我们向 executor 扔一个任务,其实这里就扔到了 ready_queue 的队列中spawner.spawn(async {println!("howdy!");// Wait for our timer future to complete after two seconds.TimerFuture::new(Duration::new(2, 0)).await;println!("done!");});// 这里会直接运行executor.run();}
不过我们在此之前,我们需要定义好如下代码:
struct Task {future: Mutex<Option<BoxFuture<'static, ()>>>,task_sender: SyncSender<Arc<Task>>,}impl ArcWake for Task {fn wake_by_ref(arc_self: &Arc<Self>) {let cloned = arc_self.clone();many tasks queued");}}
我们会发现实际上我们最终 wake 起来的还是我们自己,我们只是在控制 wake 的方式。
How it Work
核心的逻辑是,我们接收到这个任务,将其放置于我们的 Task Queue 中
spawner.spawn(async { // 这个 socpe 就是 Taskprintln!("howdy!");TimerFuture::new(Duration::new(2, 0)).await;println!("done!");});
这个任务此时还没有 wake 对象,我们为其创建 Context 对象(抽象规定的,我们需要将 Wake 置于其中),并且创建一个 wake 对象,而实际上 wake 对象仅仅是是这个 Task 本身。
因此当我们进入 TimerFuture 的 Poll 函数的时候,我们就已经在等待 2 秒之后,Wake 触发如下逻辑
fn wake_by_ref(arc_self: &Arc<Self>) {let cloned = arc_self.clone();arc_self.task_sender.send(cloned).expect("too many tasks queued");}
此时,我们只不过又将我们的任务又 Re submit 到我们的任务 Task Queue 去了,此时我们再将整个 Thread 的工作给重新唤醒进行处理即可。
线程的变化为
参考源码:
https://gist.github.com/yanickxia/270784bc004cb4c0a7b28b13ac9f2aba
Wake in real world
在真正的编程中,如果都是基于每一次启动一个线程会显得很不高效,因此一般都是基于信号来处理。
因此对于 socket 来说
impl Socket {fn set_readable_callback(&self, waker: Waker) {//这里其实就是 eventlooplet local_executor = self.local_executor;// 此socket 的idlet id = self.id;// 将需要监听的事件放入 eventloop 里面,等待 eventloop 回调local_executor.event_map.insert(id, waker);local_executor.add_io_event_interest(&self.socket_file_descriptor,Event { id, signals: READABLE },);}}
值得注意的,本篇博客意在指出 Funture 的抽象机制是如何运行的,对于真正的系统比如 tokio 在实现 Executor 会比我们现在所设计的要复杂的多。
参考
async-book
原力注入
热点文章
