内核线程管理

结合宏内核与微内核的优点,追求内核效率与安全可靠性的平衡,NUDT-OS引入了内核线程,目前的实现中内核线程分为两种:内核服务线程、内核协程执行线程。内核服务线程借助Rust语言的内存安全特性保持独立,任何一个内核服务线程故障不会影响内核整体;利用Rust异步协程机制,NUDT-OS采用多对一线程模型,内核协程执行线程统一处理所有用户线程的异步请求。

一、数据结构

/// 内核线程
#[repr(C)]
pub struct Kthread {
    /// 内核线程ID
    ktid: usize,
    /// 内核线程名称
    name: String,
    /// 运行状态
    state: RwLock<KthreadState>,
    /// 内核线程需要处理的请求队列
    req_queue: Arc<Mutex<VecDeque<Request>>>,
    /// 内核线程请求ID
    req_id: AtomicUsize,
    /// 内核线程已完成请求的ID
    res_id: Arc<Mutex<usize>>,
    /// 请求事件的唤醒器队列
    req_wakers: Mutex<Vec<(Waker, usize)>>,
    /// 上下文
    ctx: Context,
}

每个内核线程运行一种独立的用户服务,其维护一个自身需要处理的请求队列,这些请求是以异步协程的方式由内核协程执行线程来调度处理的,内核线程中维护了自己需要处理的请求协程的唤醒器,当处理完成后,即唤醒协程。

/// 内核线程状态
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
pub enum KthreadState {
    /// 繁忙
    Busy,
    #[default]
    /// 空闲
    Idle,
}

不同于用户线程,内核线程只有繁忙和空闲两种状态,这是因为内核线程在系统中是不会退出的,其在内核初始化时创建,无限循环处理用户线程的请求。处于繁忙状态时,会被调度执行,否则不会被调度执行。

二、关键方法

内核线程的关键方法包括创建新内核线程new(),向内核线程请求队列中添加一个请求add_req(),通知请求队列中一个请求处理完毕response_req()。

2.1 new()

/// 创建内核线程
pub fn new(name: String, entry: usize) -> Arc<Kthread> {
    let ktid = KTHREAD_ID.fetch_add(1, Ordering::Relaxed);

    // 两个线程栈之间空余一小段空间
    let stack_base = KERNEL_STACK_ADDRESS + ktid * KERNEL_STACK_SIZE * 2;

    // 设置sp,ip
    let mut ctx = Context::default();
    ctx.rip = entry;
    ctx.regs.rsp = stack_base;

    // 创建新内核线程
    let kthread = Arc::new(Kthread {
        ktid: ktid,
        name: name,
        state: RwLock::new(KthreadState::Idle),
        req_queue: Arc::new(Mutex::new(VecDeque::new())),
        req_id: AtomicUsize::new(0),
        res_id: Arc::new(Mutex::new(0)),
        req_wakers: Mutex::new(Vec::new()),
        ctx: ctx,
    });

    // 将内核线程放入全局线程队列
    let mut kthread_dequeue = KTHREAD_DEQUEUE.lock();
    kthread_dequeue.push_back(kthread.clone());

    kthread
}

每个内核线程都有自己的内核栈空间,两个内核栈之间空余了一小段空间,这是防止栈溢出时破坏其他线程的栈。内核线程同样包含Context结构保存上下文,创建新内核线程时,初始化线程入口点和栈基地址。

2.2 add_req()

/// 向请求队列添加请求
///
/// 返回请求ID
pub fn add_req(&self, req: Control, client_buffer: Arc<DataBuffer>) -> usize {
    // 计算当前请求ID,从1开始算
    let req_id = self.req_id.fetch_add(1, Ordering::Relaxed);

    // 添加请求
    let req_deque = self.req_queue.clone();
    let mut req_deque_lock = wait_lock_or_yield(&req_deque);
    req_deque_lock.push_back((req_id, req, client_buffer));

    req_id
}

add_req()向内核线程的请求队列中添加一个新的请求。

2.3 response_req()

/// 通知请求队列中一个请求处理完毕
pub fn response_req(&self, new_res_id: usize) {
    // 保存当前完成请求ID,失败时调度
    let mut res_id = wait_lock_or_yield(&self.res_id);
    *res_id = new_res_id;

    let mut req_wakers = wait_lock_or_yield(&self.req_wakers);
    req_wakers.retain(|req_waker| {
        let (waker, req_id) = req_waker;
        // 根据响应的请求id唤醒响应协程
        if *req_id != new_res_id {
            return true;
        }
        // 唤醒协程
        waker.wake_by_ref();
        false
    });
}

response_req()由内核线程中运行的服务代码使用,当一个请求处理完成时,response_req()使用内核线程唤醒器队列中的相应waker唤醒协程。

三、用户线程请求内核线程服务示例

下面给出一个用户线程请求内核服务的实例过程中的控制流说明

3.1 用户线程请求服务

/// 向内核线程发送请求
pub fn sys_kthread_req(kthread: Arc<Kthread>, ctrl_ptr: usize, buf_ptr: usize) -> isize {
    let thread = current();
    let process = &thread.proc;
    // 构造Vec<u8>类型control参数
    let ctrl = unsafe { (*(ctrl_ptr as *const &[u8])).to_vec() };

    // 构造&[u8]类型buf
    let buf = unsafe { *(buf_ptr as *const &[u8]) };

    // 构造DataBuffer
    let buffer = DataBuffer::new(process.root_pa().0, buf.as_ptr() as usize, buf.len());

    // 发送消息
    let req_id = kthread.add_req(ctrl, buffer);

    // 创建协程后等待
    thread.state = ThreadState::Waiting;
    executor::spawn(async_kthread_req(thread, kthread.clone(), req_id));
    1
}

用户线程通过sys_kthread_req()系统调用发送一个请求,这时内核主线程将用户线程设置为异步等待状态(不会被调度执行),然后创建一个async_kthread_req异步协程,将其添加到内核协程执行线程的任务队列中去,等待执行。

/// sys_kthread_req函数的协程任务
async fn async_kthread_req(thread: Arc<Thread>, kthread: Arc<Kthread>, req_id: usize) {
    let kthread_req_future = KthreadReqFuture::new(thread.clone(), req_id, kthread.clone());
    kthread_req_future.await;
    thread.state = ThreadState::Runnable;
}

async_kthread_req()函数创建一个KthreadReqFuture,并等待。当KthreadReqFuture就绪后(此时内核服务线程服务完成),将用户线程状态置为就绪(可调度)。

3.2 协程执行线程轮询KthreadReqFuture协程

impl Future for KthreadReqFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        // maybe question here
        let kthread = self.kthread.clone();
        let req_id = self.req_id;
        let res_id_lock = kthread.res_id();
        // 加锁保证不被中断
        let res_id = wait_lock_or_yield(&res_id_lock);

        // 根据res_id和req_id判断请求是否完成
        let ret = if *res_id >= req_id {
            // 完成请求
            Poll::Ready(())
        } else {
            // 未完成请求
            kthread.add_req_waker(cx.waker().clone(), self.req_id);
            Poll::Pending
        };
        return ret;
    }
}

通过内核线程已完成请求id和当前请求id的大小关系判断当前请求是否已完成,若未完成则将当前协程的唤醒器waker添加到内核线程的waker队列中去,并返回pending。

3.3 内核服务线程完成服务并使用waker唤醒协程

下面是一个内核服务线程的代码实例

pub fn entry(ktid: usize) {
    // 无限循环处理请求,请求队列为空时进入睡眠
    loop {
        // 保证所有堆上内存释放完毕,再进入IDEL状态
        {
            // 获取队首请求,不能带锁放弃CPU,否则无法向队列中添加请求
            ...

            // 设置状态为繁忙
            kthread.set_state(KthreadState::Busy);

            // 完成服务
            ...
            
            // 服务完成,唤醒协程
            kthread.response_req(id);
        }

        kthread.set_state(KthreadState::Idle);
    }
}

response_req已在前面说明,唤醒协程后,协程执行线程再次轮询KthreadReqFuture时即返回Ready,async_kthread_req中将用户线程状态设置为就绪(可调度),这样就完成了用户线程从发起请求到完成请求的整个过程。