通常我们在讨论 Node.js 的时候都会涉及到异步这个特性。实际上 Node.js 在执行异步调用的时候,不同的场景下有着不同的处理方式。本文将通过 libuv 源码来分析 Node.js 是如何通过 libuv 的线程池完成异步调用。本文描述的 Node.js 版本为 v11.15.0,libuv 版本为 1.24.0 。
以下面的代码为例,它通过调用 fs.access 来异步地判断文件是否存在并在回调中打印日志,在 Node.js 中这是一个典型的异步调用。
const fs = require('fs')
const cb = function (err) {
console.log(`Is myfile exists: ${!err}`)
}
fs.access('myfile', cb)
在分析上面这段代码的调用过程之前,我们先来了解一些 libuv 概念。
什么类型的请求 libuv 会把它放到线程池去执行
主动通过 libuv 发起的操作被 libuv 称为请求( uv_req_t
),libuv 的线程池作用于以下 4 种枚举的异步请求:
UV_FS
: fs 模块的异步函数(除了 uv_fs_req_cleanup ),fs.access、fs.stat 等。UV_GETADDRINFO
:dns 模块的异步函数,dns.lookup 等。UV_GETNAMEINFO
:dns 模块的异步函数,dns.lookupService 等。UV_WORK
:zlib 模块的 zlib.unzip、zlib.gzip 等;在 Node.js 的 Addon(C/C++) 中通过 uv_queue_work 创建的多线程请求。
其它的 UV_CONNECT
、UV_WRITE
、UDP_SEND
等则并不会通过线程池去执行。
线程池请求分类
这 4 种枚举请求 libuv 内部把它们分为 3 种任务类型( uv__work_kind
):
UV__WORK_CPU
:CPU 密集型,UV_WORK
类型的请求被定义为这种类型。因此根据这个分类,不推荐在uv_queue_work
中做 I/O 密集的操作。UV__WORK_FAST_IO
:快 IO 型,UV_FS
类型的请求被定义为这种类型。UV__WORK_SLOW_IO
:慢 IO 型,UV_GETADDRINFO
和UV_GETNAMEINFO
类型的请求被定义为这种类型。
UV__WORK_SLOW_IO
执行不同于 UV__WORK_CPU
与 UV__WORK_FAST_IO
,libuv 执行它的时候流程会有些差异,这个后面会提到。
线程池是如何初始化的
libuv 通过init_threads 函数初始化线程池,初始化时会根据一个名为 UV_THREADPOOL_SIZE 的环境变量来初始化内部线程池的大小,线程最大数量为 128 ,默认为 4 。如果以单进程的架构去部署服务,可以根据服务器 CPU 的核心数量及业务情况来设置线程池大小,达到资源利用的最大化。uv loop 线程在创建 worker 线程时,会初始化以下变量:
- 信号量 sem:在创建线程时与线程进行同步,每个线程创建好后将会通过这个信号量告知 uv loop 线程自己已经初始化完毕,可以开始处理请求了。当所有线程都初始化完成后这个信号量将被销毁,即完成线程池的初始化。
- 条件变量 cond:线程创建完成后通过这个条件变量进入阻塞状态( uv_cond_wait ),直到其它线程通过 uv_cond_signal 将其唤醒。
- 互斥量 mutex:对下面 3 个临界资源进行互斥访问。
- 请求队列 wq:线程池收到
UV__WORK_CPU
和UV__WORK_FAST_IO
类型的请求后将其插到此队列的尾部,并通过 uv_cond_signal 唤醒 worker 线程去处理,这是线程池请求的主队列。 - 慢 I/O 队列 slow_io_pending_wq:线程池收到
UV__WORK_SLOW_IO
类型的请求后将其插到此队列的尾部。 - 慢 I/O 标志位节点 run_slow_work_message:当存在慢 I/O 请求时,用来作为一个标志位放在请求队列 wq 中,表示当前有慢 I/O 请求,worker 线程处理请求时需要关注慢 I/O 队列的请求;当慢 I/O 队列的请求都处理完毕后这个标志位将从请求队列 wq 中移除。
worker 线程的入口函数均为 worker 函数,这个我们后面再说。 init_threads 实现如下:
static void init_threads(void) {
unsigned int i;
const char* val;
uv_sem_t sem;
// 6-23 行初始化线程池大小
nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE"); // 根据环境变量设置线程池大小
if (val != NULL)
nthreads = atoi(val);
if (nthreads == 0)
nthreads = 1;
if (nthreads > MAX_THREADPOOL_SIZE)
nthreads = MAX_THREADPOOL_SIZE;
threads = default_threads;
if (nthreads > ARRAY_SIZE(default_threads)) {
threads = uv__malloc(nthreads * sizeof(threads[0]));
if (threads == NULL) {
nthreads = ARRAY_SIZE(default_threads);
threads = default_threads;
}
}
// 初始化条件变量
if (uv_cond_init(&cond))
abort();
// 初始化互斥量
if (uv_mutex_init(&mutex))
abort();
// 初始化队列和节点
QUEUE_INIT(&wq); // 工作队列
QUEUE_INIT(&slow_io_pending_wq); // 慢 I/O 队列
QUEUE_INIT(&run_slow_work_message); // 如果有慢 I/O 请求,将此节点作为标志位插入到 wq 中
// 初始化信号量
if (uv_sem_init(&sem, 0))
abort(); // 后续线程同步需要依赖这个信号量,因此这个信号量创建失败了则终止进程
// 创建 worker 线程
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem)) // 初始化 worker 线程
abort(); // woker 线程创建错误原因为 EAGAIN、EINVAL、EPERM 其中之一,具体请参考 man3
// 等待 worker 创建完成
for (i = 0; i < nthreads; i++)
uv_sem_wait(&sem); // 等待 worker 线程创建完毕
// 回收信号量资源
uv_sem_destroy(&sem);
}
请求是如何放到线程池去执行的
libuv 有两个函数可以创建多线程请求:
- uv_queue_work:开发者常用的创建多线程请求的函数。
- uv__work_submit:libuv 内部创建多线程请求的函数,实际上 uv_queue_work 最终也是调用的这个函数。
uv__work_submit 函数主要做 2 件事:
- 调用 init_threads 初始化线程池,因为线程池的创建是惰性的,只有用到的时候才会创建。
- 调用内部的 post 函数将请求插入到请求队列中。
实现如下:
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
// 在收到请求后才开始初始化线程池,但是只会初始化一次
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
post(&w->wq, kind);
}
static void init_once(void) {
// fork 后子进程的 mutex 、condition variables 等 pthread 变量的状态是父进程 fork 时的复制,所以子进程创建时需要重置状态
// 具体请参考 http://man7.org/linux/man-pages/man2/fork.2.html
if (pthread_atfork(NULL, NULL, &reset_once))
abort();
// 初始化线程池
init_threads();
}
static void reset_once(void) {
// 重置 once 变量
uv_once_t child_once = UV_ONCE_INIT;
memcpy(&once, &child_once, sizeof(child_once));
}
post 函数主要做 2 件事:
- 判断请求的请求类型是否是
UV__WORK_SLOW_IO
:- 如果是,将这个请求插到慢 I/O 请求队列
slow_io_pending_wq
的尾部,同时在请求队列wq
的尾部插入一个run_slow_work_message
节点作为标志位,告知请求队列wq
当前存在慢 I/O 请求。 - 如果不是,将请求插到请求队列
wq
尾部。
- 如果是,将这个请求插到慢 I/O 请求队列
- 如果有空闲的线程,唤醒某一个去执行请求。
并发的慢 I/O 的请求数量不会超过线程池大小的一半,这样做的好处是避免多个慢 I/O 的请求在某段时间内把所有线程都占满,导致其它能够快速执行的请求需要排队。
post 函数实现如下:
static void post(QUEUE* q, enum uv__work_kind kind) {
// 加锁
uv_mutex_lock(&mutex);
if (kind == UV__WORK_SLOW_IO) {
/* 插入到慢 I/O 队列中 */
QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
/* 如果 run_slow_work_message 节点不为空代表其已在 wq 队列中,无需再次插入 */
if (!QUEUE_EMPTY(&run_slow_work_message)) {
uv_mutex_unlock(&mutex);
return;
}
// 不在 wq 队列中则将 run_slow_work_message 作为标志位插到 wq 尾部
q = &run_slow_work_message;
}
// 将请求插到请求队列尾部
QUEUE_INSERT_TAIL(&wq, q);
// 如果有空闲的线程,唤醒某一个去执行请求
if (idle_threads > 0)
uv_cond_signal(&cond); // 唤醒一个 worker 线程
uv_mutex_unlock(&mutex);
}
worker 线程的入口函数 worker 在线程创建好并初始化完成后将按照下面的步骤不断的循环:
- 等待唤醒。
- 取出请求队列 wq 或者慢 I/O 请求队列的头部请求去执行。
- 通知 uv loop 线程完成了一个请求的处理。
- 回到 1 。
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
int is_slow_work;
// 通知 uv loop 线程此 worker 线程已创建完毕
uv_sem_post((uv_sem_t*) arg);
arg = NULL;
uv_mutex_lock(&mutex);
// 通过这个死循环来不断的执行请求
for (;;) {
/*
这个 while 有2个判断
1. 在多核处理器下,pthread_cond_signal 可能会激活多于一个线程,通过一个 while 来避免这种情况导致的问题,具体请参考 https://linux.die.net/man/3/pthread_cond_signal
2. 限制慢 I/O 请求的数量小于线程数量的一半
*/
while (QUEUE_EMPTY(&wq) ||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
QUEUE_NEXT(&run_slow_work_message) == &wq &&
slow_io_work_running >= slow_work_thread_threshold())) {
idle_threads += 1;
// worker 线程初始化完成或没有请求执行时进入阻塞状态,直到被新的请求唤醒
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
// 唤醒并且达到执行请求的条件后取出队列头部的请求
q = QUEUE_HEAD(&wq);
// 如果头部请求是退出,则跳出循环,结束 worker 线程
if (q == &exit_message) {
// 继续唤醒其它 worker 去结束线程
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
break;
}
// 将这个请求节点从请求队列 wq 中移除
QUEUE_REMOVE(q);
QUEUE_INIT(q);
is_slow_work = 0;
// 如果这个请求是慢 I/O 的标志位
if (q == &run_slow_work_message) {
/* 控制慢 I/O 请求数量,超过则插到队列尾部,等待前面的请求执行完 */
if (slow_io_work_running >= slow_work_thread_threshold()) {
QUEUE_INSERT_TAIL(&wq, q);
continue;
}
/* 判断慢 I/O 请求队列中是否有请求,请求有可能被取消 */
if (QUEUE_EMPTY(&slow_io_pending_wq))
continue;
is_slow_work = 1;
slow_io_work_running++;
// 取出慢 I/O 请求队列中头部的请求
q = QUEUE_HEAD(&slow_io_pending_wq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
// 如果慢 I/O 请求队列中还有请求,则将 run_slow_work_message 这个标志位重新插到请求队列 wq 的尾部
if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
if (idle_threads > 0)
uv_cond_signal(&cond); // 唤醒一个线程继续执行
}
}
uv_mutex_unlock(&mutex);
w = QUEUE_DATA(q, struct uv__work, wq);
// 上面处理了这多,终于在这里开始执行请求的函数了
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL;
// 为保证线程安全,请求执行完后不会立即回调请求,而是将完成的请求插到已完成的请求队列中,在uv loop 线程完成回调
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
// 通过 uv_async_send 同步 uv loop 线程:线程池完成了一个请求
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_lock(&mutex);
if (is_slow_work) {
slow_io_work_running--;
}
}
}
请求在 worker 执行完后是如何同步 uv loop 所在的线程
在 uv_loop_init 时,线程池的 wq_async(uv_async_t) 句柄通过 uv_async_init 初始化并插入到 uv loop 的 async_handles 队列中,然后在 uv loop 线程中遍历 async_handles 队列并完成回调。
worker 线程 和 uv loop 线程通过 uv_async_send 进行同步,而uv_async_send 只做了一件事:向 async_wfd 句柄写了一个长度为 1 个字节的字符串(只有 \0
这个字符)。
uv_async_send 实现如下:
int uv_async_send(uv_async_t* handle) {
if (ACCESS_ONCE(int, handle->pending) != 0)
return 0;
// cmpxchgi 函数设置标志位,如果已经设置过则不会重复调用 uv__async_send
if (cmpxchgi(&handle->pending, 0, 1) == 0)
uv__async_send(handle->loop);
return 0;
}
static void uv__async_send(uv_loop_t* loop) {
const void* buf;
ssize_t len;
int fd;
int r;
buf = "";
len = 1;
fd = loop->async_wfd;
#if defined(__linux__)
if (fd == -1) {
static const uint64_t val = 1;
buf = &val;
len = sizeof(val);
fd = loop->async_io_watcher.fd; /* eventfd */
}
#endif
do
r = write(fd, buf, len); // 向 fd 写入内容
while (r == -1 && errno == EINTR);
if (r == len)
return;
if (r == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
abort();
}
对 async_wfd 写内容为什么能做到同步呢?实际上在 worker 线程对 async_wfd 写入时,uv loop 线程同时也在不断的循环去接收处理各种各样的事件或请求,其中就包括对 async_wfd 可读事件的监听。
uv loop 是在 uv_run 函数中执行的,它在 Node.js 启动时 被调用, uv_run 实现如下:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
// 更新计时器时间
uv__update_time(loop);
// 回调超时的计时器,setTimeout、setInterval 都是由这个函数回调
uv__run_timers(loop);
// 处理某些没有在 uv__io_poll 完成的回调
ran_pending = uv__run_pending(loop);
// 官方解释:Idle handle is needed only to stop the event loop from blocking in poll.
// 实际上 napi 中某些函数比如 napi_call_threadsafe_function 会往 idle 队列中插入回调,然后在这个阶段执行
uv__run_idle(loop);
// process._startProfilerIdleNotifier 的回调
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop); // 计算 uv__io_poll 超时时间,算法请参考 https://github.com/libuv/libuv/blob/v1.24.0/src/unix/core.c#L318
// 对 async_wfd 可读的监听在 uv__io_poll 这个函数中
// 第二个参数 timeout 为上面计算出来,用来设置 epoll_wait 等函数等待 I/O 事件的时间
uv__io_poll(loop, timeout);
// setImmediate 的回调
// ps: 个人觉得从实现上讲 setImmediate 和 nextTick 应该互换名字 :-)
uv__run_check(loop);
// 关闭句柄是个异步操作
// 一般结束 uv loop 时会先调用 uv_walk 遍历所有句柄并关闭它们,然后再执行一次 uv loop 通过这个函数来完成关闭,最后再调用 uv_loop_close,否则的话会出现内存泄露
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
可以看到 uv loop 里面其实就是在不断的循环去更新计时器、处理各种类型的回调、轮询 I/O 事件,Node.js 的异步便是通过 uv loop 完成的。
libuv 的异步采用的是 Reactor 模型进行多路复用,在 uv__io_poll 中去处理 I/O 相关的事件, uv__io_poll 在不同的平台下通过 epoll、kqueue 等不同的方式实现。所以当往 async_wfd 写入内容时,在 uv__io_poll 中将会轮询到 async_wfd 可读的事件,这个事件仅仅是用来通知 uv loop 线程: 非 uv loop 线程有回调需要在 uv loop 线程执行。
当轮询到 async_wfd 可读后,uv__io_poll 会回调对应的函数 uv__async_io,它主要做了下面 2 件事:
- 读取数据,确认是否有 uv_async_send 调用,数据内容并不关心。
- 遍历 async_handles 句柄队列 ,判断是否有事件,如果有的话执行它的回调。
实现如下:
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE* q;
uv_async_t* h;
assert(w == &loop->async_io_watcher);
// 这个 for 循环用来确认是否有 uv_async_send 调用
for (;;) {
r = read(w->fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
// 交换 loop->async_handle 和 queue内容,避免在遍历 loop->async_handles 时有新的 async_handle 插入到队列
// loop->async_handles 队列中除了线程池的句柄还有其它的
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
QUEUE_REMOVE(q);
// 将 uv_async_t 重新插入到 loop->async_handles 中,uv_async_t 需要手动调用 uv__async_stop 才会从队列中移除
QUEUE_INSERT_TAIL(&loop->async_handles, q);
// 确认这个 async_handle 是否需要回调
if (cmpxchgi(&h->pending, 1, 0) == 0)
continue;
if (h->async_cb == NULL)
continue;
// 调用通过 uv_async_init 初始化 uv_async_t 时绑定的回调函数
// 线程池的 uv_async_t 是在 uv_loop_init 时初始化的,它绑定的回调是 uv__work_done
// 因此如果 h == loop->wq_async,这里 h->async_cb 实际是调用了 uv__work_done(h);
// 详情请参考 https://github.com/libuv/libuv/blob/v1.24.0/src/unix/loop.c#L88
h->async_cb(h);
}
}
调用线程池的 h->async_cb
后会回到线程池的 uv__work_done 函数:
void uv__work_done(uv_async_t* handle) {
struct uv__work* w;
uv_loop_t* loop;
QUEUE* q;
QUEUE wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
// 清空已完成的 loop->wq 队列
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
// 如果在回调前调用了 uv_cancel 取消请求,则即使请求已经执行完,依旧算出错
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
最后通过 w->done(w, err)
回调 uv__fs_done,并由 uv__fs_done 回调 JS 函数:
static void uv__fs_done(struct uv__work* w, int status) {
uv_fs_t* req;
req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);
// 如果取消了则抛出异常
if (status == UV_ECANCELED) {
assert(req->result == 0);
req->result = UV_ECANCELED;
}
// 回调 JS
req->cb(req);
}
以上就是 libuv 是线程池从创建到执行多线程请求的过程。
fs.access 调用过程分析
再回到文章开头提到的代码,我们来分析它的调用过程。
const fs = require('fs')
const cb = function (err) {
console.log(`Is myfile exists: ${!err}`)
}
fs.access('myfile', cb)
假设线程池大小为 2 ,下面描述了执行 fs.access 时 3 个线程的状态(略过了 Node.js 启动和 JavaScript 和 Native 函数调用过程),时间轴从上到下:
空白
代表处于阻塞状态,-
代表线程尚未启动
uv loop thread | worker thread 1 | worker thread 2 |
---|---|---|
fs.access(‘myfile’, cb) | - | - |
JavaScript 通过 v8 调用 Native 函数 | - | - |
uv_fs_access | - | - |
uv__work_submit | - | - |
init_threads | worker | worker |
uv_sem_wait | uv_sem_post | uv_sem_post |
uv_cond_wait | uv_cond_wait | |
uv_cond_signal | ||
uv__io_poll | access | |
uv__io_poll | ||
uv__io_poll | uv_async_send | |
uv__io_poll | uv_cond_wait | |
uv__io_poll | ||
uv__async_io | ||
uv__work_done | ||
uv__fs_done | ||
Native 通过 v8 回调 JavaScript 函数 | ||
cb | ||
console.log(`Is myfile exists: ${exists}`) |
可以看到调用过程如下:
- 通过 Node.js 启动时对 JavaScript 函数与 Native 函数的绑定,fs.access 最终会进入到 Native 函数中,而 Native 函数会调用 libuv 的 uv_fs_access 函数来判断文件是否可以访问。(这里略过 JavaScript 如何通过 v8 调用 Native 函数)
- uv_fs_access 在 uv loop 线程向线程池提交了一个多线程请求。
- 由于线程池是惰性的,在执行请求前,先进行了初始化线程池的操作。
- 线程池初始化完成后唤醒了
worker thread 1
去执行请求,同时 uv loop 线程不断的轮询是否完成了请求。 worker thread 1
同步的调用 access 函数判断目标文件是否可读。- access 函数完成后,
worker thread 1
通过 uv_async_send 同步 uv loop 线程请求已完成,同时自身进入阻塞状态,等待新的请求将其唤醒。 - uv loop 线程发现请求执行完成后通过一系列回调回到 uv__fs_done。
- uv__fs_done 回调 JavaScript 函数打印日志。(这里略过 uv__fs_done 是如何通过 v8 回调到 JavaScript)
整个过程由于没有新的请求进来, worker thread 2
始终处于阻塞状态。
结束语
通过对 fs.access 的调用过程分析,我们了解了 libuv 是如何通过线程池进行异步调用的。另外也可以看到针对不同的平台,libuv 对 uv__io_poll 的实现是不同的,后面我们将介绍 uv__io_poll 实现异步 I/O 的方式。
先码后看, 看起来很吊