异步协程
想要使用协程,可以使用Rust中提供的Future
trait,其代表一个可能尚未就绪的值,我们需要为一个实现Future trait
的对象实现poll
方法,这个方法查询Future
是否准备好,准备好时返回一个Poll::Ready
类型,否则返回Poll::Pending
.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
协程执行器中的协程任务
/// 协程任务
pub struct Task {
/// 协程执行器
executor: Weak<Executor>,
/// 任务协程
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
/// 任务状态
ready: AtomicBool,
}
将一个实现了Future
的对象封装起来,作为协程执行器中的一个协程任务。它还包含了协程执行器的一个弱指针,和自身任务的状态(就绪和未就绪)。
impl Task {
/// 获取任务所属执行器
pub fn executor(&self) -> Arc<Executor> {
self.executor.upgrade().unwrap()
}
/// 获取任务状态
pub fn ready(&self) -> bool {
self.ready.load(Ordering::SeqCst)
}
/// 设置任务状态
pub fn set_ready(&self, ready: bool) {
self.ready.store(ready, Ordering::SeqCst)
}
/// 执行当前任务
pub fn poll(&self, cx: &mut Context<'_>) -> Poll<()> {
self.future.lock().as_mut().poll(cx)
}
}
轮询Task时调用其中内部future对象的poll方法进行轮询。
/// 为协程任务实现唤醒器
impl Woke for Task {
fn wake_by_ref(task: &Arc<Self>) {
// 设置执行器状态繁忙
task.executor().set_state(ExecutorState::Ready);
// 唤醒当前任务
task.set_ready(true);
}
}
wake_by_ref
方法将协程任务状态设置为就绪,并将协程执行器状态设置为就绪。在内核线程中我们已经看到,内核线程中有一个唤醒器的队列。当内核线程服务完成时,就会使用wake_by_ref
方法唤醒协程。
KthreadReqFuture
/// 向内核线程发送请求协程
pub struct KthreadReqFuture {
// 发送请求线程
thread: Arc<Thread>,
// 请求ID
req_id: usize,
// 内核线程
kthread: Arc<Kthread>,
}
用户线程向内核线程发送请求的时候就会创建一个KthreadReqFuture
对象,其包含的req_id代表了自己的请求在内核服务线程中的id。
impl Future for KthreadReqFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// maybe question here
let kthread = self.kthread.clone();
let req_id = self.req_id;
let res_id_lock = kthread.res_id();
// 加锁保证不被中断
let res_id = wait_lock_or_yield(&res_id_lock);
// 根据res_id和req_id判断请求是否完成
let ret = if *res_id >= req_id {
// 完成请求
Poll::Ready(())
} else {
// 未完成请求
kthread.add_req_waker(cx.waker().clone(), self.req_id);
Poll::Pending
};
return ret;
}
}
对其轮询时,根据自己的id和内核线程中已完成请求的id来判断是否完成了请求。若未完成请求,则不会再轮询,将自己的Waker添加到内核线程队列的唤醒器中,等待服务完成后才会再次轮询并返回Ready。