Task Wakeups with Waker

Future 一次polled 就能完成的,并不常见。而多数情况下,Future 需要确保一旦准备好前进,就再次进行轮询(poll) 。而这是通过Waker类型,辅助完成的。

每次 Future poll 时,都会将其作为“任务(task)”的一部分。任务是已提交给 executor 的顶级 Future 。

Waker提供一个wake()方法,它可以用来告诉 executor,应该唤醒的相关任务。当wake()被调用时, executor 知道与Waker相关联的任务是准备前进,并且,它的 Future 应再次进行 poll。

Waker还实现了clone(),这样就可以将其复制和存储。

让我们尝试使用Waker,实现一个简单的计时器 future。

Applied: Build a Timer

在本示例中,我们将在创建计时器(Timer)时,启动一个新线程,休眠下所需的时间,然后在时间窗口 elapsed(逝去) 后,向计时器发出信号。

这是我们需要开始的导入:


                        #![allow(unused_variables)]
                        fn main() {
                        use {
                            std::{
                                future::Future,
                                pin::Pin,
                                sync::{Arc, Mutex},
                                task::{Context, Poll, Waker},
                                thread,
                                time::Duration,
                            },
                        };
                        }
                        

让我们从定义 future 类型本身开始。我们的 future 需要一种方法,来让线程可以传达,timer elapsed 和 这个 future 应该完成的信息。我们将使用一个Arc<Mutex<..>>共享值,在线程和 Future 之间进行通信。


                        #![allow(unused_variables)]
                        fn main() {
                        pub struct TimerFuture {
                            shared_state: Arc<Mutex<SharedState>>,
                        }
                        
                        /// Shared state between the future and the waiting thread
                        struct SharedState {
                            /// Whether or not the sleep time has elapsed
                            completed: bool,
                        
                            /// The waker for the task that `TimerFuture` is running on.
                            /// The thread can use this after setting `completed = true` to tell
                            /// `TimerFuture`'s task to wake up, see that `completed = true`, and
                            /// move forward.
                            waker: Option<Waker>,
                        }
                        }
                        

现在,让我们实际编写Future实现!


                        #![allow(unused_variables)]
                        fn main() {
                        impl Future for TimerFuture {
                            type Output = ();
                            fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
                                // Look at the shared state to see if the timer has already completed.
                                let mut shared_state = self.shared_state.lock().unwrap();
                                if shared_state.completed {
                                    Poll::Ready(())
                                } else {
                                    // Set waker so that the thread can wake up the current task
                                    // when the timer has completed, ensuring that the future is polled
                                    // again and sees that `completed = true`.
                                    //
                                    // It's tempting to do this once rather than repeatedly cloning
                                    // the waker each time. However, the `TimerFuture` can move between
                                    // tasks on the executor, which could cause a stale waker pointing
                                    // to the wrong task, preventing `TimerFuture` from waking up
                                    // correctly.
                                    //
                                    // N.B. it's possible to check for this using the `Waker::will_wake`
                                    // function, but we omit that here to keep things simple.
                                    shared_state.waker = Some(cx.waker().clone());
                                    Poll::Pending
                                }
                            }
                        }
                        }
                        

很简单,对吧?如果线程设置了shared_state.completed = true,我们就搞定了!不然的话,我们会为当前任务,clone Waker,并将其传递给shared_state.waker,这样线程才能唤醒备份的任务。

重要的是,每次 Future 进行 poll,我们必须更新Waker,因为 Future 可能已经转移到,具有一个不同Waker的不同任务上了。这种情况在 Future poll 后,在任务之间传来传去时,会发生。

最后,我们需要实际构造计时器的 API ,并启动线程:


                        #![allow(unused_variables)]
                        fn main() {
                        impl TimerFuture {
                            /// Create a new `TimerFuture` which will complete after the provided
                            /// timeout.
                            pub fn new(duration: Duration) -> Self {
                                let shared_state = Arc::new(Mutex::new(SharedState {
                                    completed: false,
                                    waker: None,
                                }));
                        
                                // Spawn the new thread
                                let thread_shared_state = shared_state.clone();
                                thread::spawn(move || {
                                    thread::sleep(duration);
                                    let mut shared_state = thread_shared_state.lock().unwrap();
                                    // Signal that the timer has completed and wake up the last
                                    // task on which the future was polled, if one exists.
                                    shared_state.completed = true;
                                    if let Some(waker) = shared_state.waker.take() {
                                        waker.wake()
                                    }
                                });
                        
                                TimerFuture { shared_state }
                            }
                        }
                        }
                        

Woot!这就是我们构建一个简单的计时器 future 的全部。现在,如果我们只有一个 executor,来运行 future ...