内核线程管理
结合宏内核与微内核的优点,追求内核效率与安全可靠性的平衡,NUDT-OS引入了内核线程,目前的实现中内核线程分为两种:内核服务线程、内核协程执行线程。内核服务线程借助Rust语言的内存安全特性保持独立,任何一个内核服务线程故障不会影响内核整体;利用Rust异步协程机制,NUDT-OS采用多对一线程模型,内核协程执行线程统一处理所有用户线程的异步请求。
一、数据结构
/// 内核线程
#[repr(C)]
pub struct Kthread {
/// 内核线程ID
ktid: usize,
/// 内核线程名称
name: String,
/// 运行状态
state: RwLock<KthreadState>,
/// 内核线程需要处理的请求队列
req_queue: Arc<Mutex<VecDeque<Request>>>,
/// 内核线程请求ID
req_id: AtomicUsize,
/// 内核线程已完成请求的ID
res_id: Arc<Mutex<usize>>,
/// 请求事件的唤醒器队列
req_wakers: Mutex<Vec<(Waker, usize)>>,
/// 上下文
ctx: Context,
}
每个内核线程运行一种独立的用户服务,其维护一个自身需要处理的请求队列,这些请求是以异步协程的方式由内核协程执行线程来调度处理的,内核线程中维护了自己需要处理的请求协程的唤醒器,当处理完成后,即唤醒协程。
/// 内核线程状态
#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
pub enum KthreadState {
/// 繁忙
Busy,
#[default]
/// 空闲
Idle,
}
不同于用户线程,内核线程只有繁忙和空闲两种状态,这是因为内核线程在系统中是不会退出的,其在内核初始化时创建,无限循环处理用户线程的请求。处于繁忙状态时,会被调度执行,否则不会被调度执行。
二、关键方法
内核线程的关键方法包括创建新内核线程new(),向内核线程请求队列中添加一个请求add_req(),通知请求队列中一个请求处理完毕response_req()。
2.1 new()
/// 创建内核线程
pub fn new(name: String, entry: usize) -> Arc<Kthread> {
let ktid = KTHREAD_ID.fetch_add(1, Ordering::Relaxed);
// 两个线程栈之间空余一小段空间
let stack_base = KERNEL_STACK_ADDRESS + ktid * KERNEL_STACK_SIZE * 2;
// 设置sp,ip
let mut ctx = Context::default();
ctx.rip = entry;
ctx.regs.rsp = stack_base;
// 创建新内核线程
let kthread = Arc::new(Kthread {
ktid: ktid,
name: name,
state: RwLock::new(KthreadState::Idle),
req_queue: Arc::new(Mutex::new(VecDeque::new())),
req_id: AtomicUsize::new(0),
res_id: Arc::new(Mutex::new(0)),
req_wakers: Mutex::new(Vec::new()),
ctx: ctx,
});
// 将内核线程放入全局线程队列
let mut kthread_dequeue = KTHREAD_DEQUEUE.lock();
kthread_dequeue.push_back(kthread.clone());
kthread
}
每个内核线程都有自己的内核栈空间,两个内核栈之间空余了一小段空间,这是防止栈溢出时破坏其他线程的栈。内核线程同样包含Context结构保存上下文,创建新内核线程时,初始化线程入口点和栈基地址。
2.2 add_req()
/// 向请求队列添加请求
///
/// 返回请求ID
pub fn add_req(&self, req: Control, client_buffer: Arc<DataBuffer>) -> usize {
// 计算当前请求ID,从1开始算
let req_id = self.req_id.fetch_add(1, Ordering::Relaxed);
// 添加请求
let req_deque = self.req_queue.clone();
let mut req_deque_lock = wait_lock_or_yield(&req_deque);
req_deque_lock.push_back((req_id, req, client_buffer));
req_id
}
add_req()向内核线程的请求队列中添加一个新的请求。
2.3 response_req()
/// 通知请求队列中一个请求处理完毕
pub fn response_req(&self, new_res_id: usize) {
// 保存当前完成请求ID,失败时调度
let mut res_id = wait_lock_or_yield(&self.res_id);
*res_id = new_res_id;
let mut req_wakers = wait_lock_or_yield(&self.req_wakers);
req_wakers.retain(|req_waker| {
let (waker, req_id) = req_waker;
// 根据响应的请求id唤醒响应协程
if *req_id != new_res_id {
return true;
}
// 唤醒协程
waker.wake_by_ref();
false
});
}
response_req()由内核线程中运行的服务代码使用,当一个请求处理完成时,response_req()使用内核线程唤醒器队列中的相应waker唤醒协程。
三、用户线程请求内核线程服务示例
下面给出一个用户线程请求内核服务的实例过程中的控制流说明
3.1 用户线程请求服务
/// 向内核线程发送请求
pub fn sys_kthread_req(kthread: Arc<Kthread>, ctrl_ptr: usize, buf_ptr: usize) -> isize {
let thread = current();
let process = &thread.proc;
// 构造Vec<u8>类型control参数
let ctrl = unsafe { (*(ctrl_ptr as *const &[u8])).to_vec() };
// 构造&[u8]类型buf
let buf = unsafe { *(buf_ptr as *const &[u8]) };
// 构造DataBuffer
let buffer = DataBuffer::new(process.root_pa().0, buf.as_ptr() as usize, buf.len());
// 发送消息
let req_id = kthread.add_req(ctrl, buffer);
// 创建协程后等待
thread.state = ThreadState::Waiting;
executor::spawn(async_kthread_req(thread, kthread.clone(), req_id));
1
}
用户线程通过sys_kthread_req()系统调用发送一个请求,这时内核主线程将用户线程设置为异步等待状态(不会被调度执行),然后创建一个async_kthread_req异步协程,将其添加到内核协程执行线程的任务队列中去,等待执行。
/// sys_kthread_req函数的协程任务
async fn async_kthread_req(thread: Arc<Thread>, kthread: Arc<Kthread>, req_id: usize) {
let kthread_req_future = KthreadReqFuture::new(thread.clone(), req_id, kthread.clone());
kthread_req_future.await;
thread.state = ThreadState::Runnable;
}
async_kthread_req()函数创建一个KthreadReqFuture,并等待。当KthreadReqFuture就绪后(此时内核服务线程服务完成),将用户线程状态置为就绪(可调度)。
3.2 协程执行线程轮询KthreadReqFuture协程
impl Future for KthreadReqFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// maybe question here
let kthread = self.kthread.clone();
let req_id = self.req_id;
let res_id_lock = kthread.res_id();
// 加锁保证不被中断
let res_id = wait_lock_or_yield(&res_id_lock);
// 根据res_id和req_id判断请求是否完成
let ret = if *res_id >= req_id {
// 完成请求
Poll::Ready(())
} else {
// 未完成请求
kthread.add_req_waker(cx.waker().clone(), self.req_id);
Poll::Pending
};
return ret;
}
}
通过内核线程已完成请求id和当前请求id的大小关系判断当前请求是否已完成,若未完成则将当前协程的唤醒器waker添加到内核线程的waker队列中去,并返回pending。
3.3 内核服务线程完成服务并使用waker唤醒协程
下面是一个内核服务线程的代码实例
pub fn entry(ktid: usize) {
// 无限循环处理请求,请求队列为空时进入睡眠
loop {
// 保证所有堆上内存释放完毕,再进入IDEL状态
{
// 获取队首请求,不能带锁放弃CPU,否则无法向队列中添加请求
...
// 设置状态为繁忙
kthread.set_state(KthreadState::Busy);
// 完成服务
...
// 服务完成,唤醒协程
kthread.response_req(id);
}
kthread.set_state(KthreadState::Idle);
}
}
response_req已在前面说明,唤醒协程后,协程执行线程再次轮询KthreadReqFuture时即返回Ready,async_kthread_req中将用户线程状态设置为就绪(可调度),这样就完成了用户线程从发起请求到完成请求的整个过程。