异步协程

想要使用协程,可以使用Rust中提供的Futuretrait,其代表一个可能尚未就绪的值,我们需要为一个实现Future trait的对象实现poll方法,这个方法查询Future是否准备好,准备好时返回一个Poll::Ready类型,否则返回Poll::Pending.

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

协程执行器中的协程任务

/// 协程任务
pub struct Task {
    /// 协程执行器
    executor: Weak<Executor>,
    /// 任务协程
    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
    /// 任务状态
    ready: AtomicBool,
}

将一个实现了Future的对象封装起来,作为协程执行器中的一个协程任务。它还包含了协程执行器的一个弱指针,和自身任务的状态(就绪和未就绪)。

impl Task {
    /// 获取任务所属执行器
    pub fn executor(&self) -> Arc<Executor> {
        self.executor.upgrade().unwrap()
    }

    /// 获取任务状态
    pub fn ready(&self) -> bool {
        self.ready.load(Ordering::SeqCst)
    }

    /// 设置任务状态
    pub fn set_ready(&self, ready: bool) {
        self.ready.store(ready, Ordering::SeqCst)
    }

    /// 执行当前任务
    pub fn poll(&self, cx: &mut Context<'_>) -> Poll<()> {
        self.future.lock().as_mut().poll(cx)
    }
}

轮询Task时调用其中内部future对象的poll方法进行轮询。

/// 为协程任务实现唤醒器
impl Woke for Task {
    fn wake_by_ref(task: &Arc<Self>) {
        // 设置执行器状态繁忙
        task.executor().set_state(ExecutorState::Ready);
        // 唤醒当前任务
        task.set_ready(true);
    }
}

wake_by_ref方法将协程任务状态设置为就绪,并将协程执行器状态设置为就绪。在内核线程中我们已经看到,内核线程中有一个唤醒器的队列。当内核线程服务完成时,就会使用wake_by_ref方法唤醒协程。

KthreadReqFuture

/// 向内核线程发送请求协程
pub struct KthreadReqFuture {
    // 发送请求线程
    thread: Arc<Thread>,
    // 请求ID
    req_id: usize,
    // 内核线程
    kthread: Arc<Kthread>,
}

用户线程向内核线程发送请求的时候就会创建一个KthreadReqFuture对象,其包含的req_id代表了自己的请求在内核服务线程中的id。

impl Future for KthreadReqFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        // maybe question here
        let kthread = self.kthread.clone();
        let req_id = self.req_id;
        let res_id_lock = kthread.res_id();
        // 加锁保证不被中断
        let res_id = wait_lock_or_yield(&res_id_lock);

        // 根据res_id和req_id判断请求是否完成
        let ret = if *res_id >= req_id {
            // 完成请求
            Poll::Ready(())
        } else {
            // 未完成请求
            kthread.add_req_waker(cx.waker().clone(), self.req_id);
            Poll::Pending
        };
        return ret;
    }
}

对其轮询时,根据自己的id和内核线程中已完成请求的id来判断是否完成了请求。若未完成请求,则不会再轮询,将自己的Waker添加到内核线程队列的唤醒器中,等待服务完成后才会再次轮询并返回Ready。