前言
通过前两篇文章的学习,我们已经解了Libuv中的队列和线程基础,为本文的学习打下基础,没有看过的同学建议先看下。下面将从生产者消费者模型和源码两个角度学习Libuv的线程池,为后面学习Libuv文件处理做铺垫。
生产者消费者模型
Node.js的文件操作支持同步调用和异步调用,根据Libuv官网的介绍,我们知道它没有跨平台的异步文件IO可以使用,所以它的异步文件IO是通过在线程池中执行同步文件IO实现的。那具体是怎么实现的呢?答案就是生产者消费者模型。Libuv的线程包括2部分,一个是主线程,一个是线程池。主线程的一部分工作是描述任务并将其提交给线程池,线程池进行处理。拿异步文件操作为例,主线程生成一个描述文件操作的对象,将其提交到任务队列;线程池从任务队列获取该对象进行处理。其中主线程是生产者,线程池中的线程是消费者,任务队列是生产者和消费者之间的桥梁,下面是一个简单的示意图:
Libuv在生产者消费者模型中多加了一步,线程池执行完任务后,将结果交给主线程,主线程拿到结果后,如果发现有回调函数需要执行,就执行。所以Libuv的线程模型如下:
源码分析
Libuv线程池的代码很容易找到,就在src目录下的threadpool.c文件中。
通过上面对生产者消费者模型的介绍,该代码大致分为4部分:任务队列、主线程提交任务到任务队列(提交任务)、线程池从任务队列获取任务并执行(消费任务)、线程池执行完任务通知主线程执行回调函数(回调处理)。
任务队列
任务队列就是一个队列而已。由于任务队列会被多个线程(主线程、线程池)同时访问,为了保证线程安全,需要互斥锁。另外任务队列如果为空,线程池中的线程需要挂起,等待主线程提交任务后唤起,所以还需要条件变量。任务队列、条件变量、互斥量的定义如下所示:
...
static uv_cond_t cond; // 条件变量
static uv_mutex_t mutex; // 互斥锁
...
static QUEUE wq; // 任务队列
...
提交任务
主线程将任务提交到任务队列是通过uv__work_submit来实现的,让我们来看下它的代码:
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
void* wq[2]; // 用于将其关联到任务队列中
};
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once); // 初始化线程,无乱调用多少次,init_once只会执行一次
w->loop = loop; // 事件循环
w->work = work; // 线程池要执行的函数
w->done = done; // 线程池执行结束后,通知主线程要执行的函数
post(&w->wq, kind); // 将任务提交任务队列中
}
uv__work_submit有4个参数:第一个参数为Libuv的事件循环,这里我们先忽略,以后会有专门的文章介绍;第二个参数是线程池执行任务的通用模型,类型为uv__work,属性work表示线程池中要执行的函数,属性done表示线程池执行完,通知主线程要执行的函数;第三、四个参数分别对应work函数和done函数。该函数主要做了两件事情:一件是通过uv_once调用init_once来初始化线程池;另一件是对w进行赋值,然后通过post将其提交到任务队列。这里需要注意,通过nv_once可以保证uv__work_submit在调用多次的情况,init_once只执行一次,nv_once底层是通过pthread_once实现的。init_once会在下一节介绍,让我们先来看下post。
static void post(QUEUE* q, enum uv__work_kind kind) {
// 获取锁
uv_mutex_lock(&mutex);
...
// 将任务添加到任务队列的最后
QUEUE_INSERT_TAIL(&wq, q);
// 如果线程池中有挂起的线程,就唤起挂起的线程,让其工作
if (idle_threads > 0)
uv_cond_signal(&cond);
// 释放锁
uv_mutex_unlock(&mutex);
}
代码很简单,先获取锁mutex,然后将任务提交到任务队列中。如果线程池中有挂起的线程,就通过条件变量cond唤起并放弃锁mutex。
消费任务
任务队列中的任务是通过线程池进行消费的,而线程池的初始化是在uv__work_submit调用init_once实现的,先看下如何初始化线程池吧:
static void init_once(void) {
...
init_threads();
}
init_once调用了init_threads,那就看下init_threads。
...
#define MAX_THREADPOOL_SIZE 1024 // 线程池的最大数量
...
static uv_thread_t* threads; // 线程池
static uv_thread_t default_threads[4]; // 默认的线程池,线程数量为4
...
static void init_threads(void) {
unsigned int i;
const char* val;
...
// 计算线程池中线程的数量,不能大于最大值MAX_THREADPOOL_SIZE
nthreads = ARRAY_SIZE(default_threads);
// 通过环境变量设置线程池的大小
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nthreads = atoi(val);
// 保存线程池中最少有一个线程
if (nthreads == 0)
nthreads = 1;
// 线程池中线程数量不能超过MAX_THREADPOOL_SIZE
if (nthreads > MAX_THREADPOOL_SIZE)
nthreads = MAX_THREADPOOL_SIZE;
// 初始化线程池
threads = default_threads;
if (nthreads > ARRAY_SIZE(default_threads)) {
threads = uv__malloc(nthreads * sizeof(threads[0]));
if (threads == NULL) {
nthreads = ARRAY_SIZE(default_threads);
threads = default_threads;
}
}
// 创建条件变量
if (uv_cond_init(&cond))
abort();
// 创建互斥量
if (uv_mutex_init(&mutex))
abort();
// 初始化任务队列
QUEUE_INIT(&wq);
...
// 根据线程池的数量,初始化线程池中的每个线程,并执行worker函数
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
abort();
...
}
通过上面的代码可以知道init_threads先获取线程池的大小nthreads;然后初始化互斥量mutex、条件变量cond和任务队列wq;最后创建nthreads个线程,每个线程执行worker函数。worker函数的核心就是消费任务队列中的任务,让我们详细的看下它:
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
...
arg = NULL;
// 获取互斥锁
uv_mutex_lock(&mutex);
// 通过无限循环,保证线程一直执行
for (;;) {
// 如果任务队列为空,通过等待条件变量cond挂起,并释放锁mutex
// 主线程提交任务通过uv_cond_signal唤起,并重新获取锁mutex
while (QUEUE_EMPTY(&wq) || ...) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
// 从任务队列中获取第一个任务
q = QUEUE_HEAD(&wq);
...
// 将该任务从任务队列中删除
QUEUE_REMOVE(q);
QUEUE_INIT(q);
...
// 操作完任务队列,释放锁mutex
uv_mutex_unlock(&mutex);
// 获取uv__work对象,并执行work
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
// 获取loop的互斥锁wq_mutex
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL;
// 将执行完work函数的任务挂到loop->wq队列中
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
// 通过uv_async_send通知主线程,当然有任务执行完了,主线程可以执行任务中的done函数。
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
// 获取锁,执行任务队列中的下一个任务
...
uv_mutex_lock(&mutex);
...
}
}
worker的本质就是从任务队列中获取任务,然后执行work函数。执行完后,将该任务提交到事件循环loop的wp队列中,通过uv_async_send告知主线程执行任务中的done函数。
回调处理
上面我们介绍了worker函数在执行完任务后会通过uv_async_send告知主线程执行回调函数,那这块是怎么实现的呢?这里涉及到了事件循环,这里我们就简单的介绍一下,后面会有详细的文章介绍它。事件循环loop在初始化的时候会调用uv_async_init,该函数的第三个参数是一个函数,当其他线程调用uv_async_send时,该函数就会执行。具体代码如下:
uv_async_init(loop, &loop->wq_async, uv__work_done);
void uv__work_done(uv_async_t* handle) {
struct uv__work* w;
uv_loop_t* loop;
QUEUE* q;
QUEUE wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
uv__work_done很简单,获取loop中的wq队列,获取队列中的每个任务并调用done函数。
总结
本文首先介绍了生产者消费者模型,然后通过任务队列、提交任务、消费任务、回调处理讲解了Libuv线程池,为下一篇讲解Libuv文件处理做铺垫,如果你对Libuv系列感兴趣的话,欢迎关注我们。