协程执行器

内核异步系统调用创建的所有异步协程都是由协程执行器来调度的。协程执行器维护一个协程队列,并循环轮询其中的所有任务,当任务完成时,相应的等待的用户线程状态也置为就绪,可以被内核调度继续执行。本节介绍协程执行器结构。

/// 执行器状态
#[derive(Default, PartialEq, Eq)]
pub enum ExecutorState {
    #[default]
    /// 无任务运行
    Idle = 0,
    /// 有任务运行
    Ready = 1,
    /// 有任务正在运行
    Running = 2,
}

/// 协程执行器
pub struct Executor {
    /// 任务队列
    tasks_queue: Mutex<VecDeque<Arc<Task>>>,
    /// 执行器状态
    state: Arc<Mutex<ExecutorState>>,
}

协程执行器具有三个状态,Ready代表有任务就绪了,但当前不在运行状态。

impl Executor {
    /// 添加任务
    pub fn push_task(&self, task: Arc<Task>) {
        self.tasks_queue.lock().push_back(task);
    }

    /// 执行协程,直到任务队列中无就绪任务才停止
    pub fn run(&self) {
        while let Some(task) = self.pop_ready_task() {
            task.set_ready(false);
            // 由task创建waker
            let waker = waker_ref(&task);
            // 由waker创建context
            let mut context = Context::from_waker(&*waker);
            match task.poll(&mut context) {
                Poll::Ready(_) => continue,
                Poll::Pending => self.push_task(task),
            };
        }
    }

    /// 弹出第一个ready的任务
    pub fn pop_ready_task(&self) -> Option<Arc<Task>> {
        let mut tasks_queue = self.tasks_queue.lock();
        for _ in 0..tasks_queue.len() {
            let task = tasks_queue.pop_front().unwrap();
            if task.ready() {
                return Some(task);
            }
            tasks_queue.push_back(task);
        }
        return None;
    }
}

run方法循环地轮询任务队列中所有就绪任务,直到没有任何就绪任务时停止。

/// 全局协程执行器
static EXECUTOR: Lazy<Arc<Executor>> = Lazy::new(|| {
    // 创建内核协程执行器线程对象
    Kthread::new("executor".to_string(), executor_entry as _);

    Arc::new(Executor {
        tasks_queue: Mutex::new(VecDeque::new()),
        state: Arc::new(Mutex::new(ExecutorState::default())),
    })
});

/// 外部接口,执行协程
pub fn run() {
    // 若执行器状态为Ready,设置为Running
    {
        let state_lock = EXECUTOR.state();
        let mut state = wait_lock_or_yield(&state_lock);
        if *state == ExecutorState::Ready {
            *state = ExecutorState::Running;
        }
    }

    EXECUTOR.run();

    // 此时执行器任务队列中无就绪任务
    {
        let state_lock = EXECUTOR.state();
        let mut state = wait_lock_or_yield(&state_lock);
        if *state == ExecutorState::Running {
            *state = ExecutorState::Idle;
        }
    }
}

我们在内核中创建了一个全局协程执行器,其作为一个内核线程无限循环轮询所有的协程任务,处理所有用户线程的异步请求。

/// 协程执行器内核线程入口
pub fn executor_entry(_ktid: usize) {
    // 无限循环运行内核协程
    loop {
        executor::run();
        task::yield_current_kthread();
    }
}

可见,全局协程执行线程的线程函数便是循环地轮询所有异步任务,当任务队列没有就绪任务时,主动放弃CPU调度其他内核线程。