代码仓库:https://github.com/theanarkh/read-libuv-code
一切要从libuv的初始化开始。
uv_default_loop();
该函数调用
uv_loop_init();
进行初始化。uv_loop_init有以下代码。
uv_async_init(loop, &loop->wq_async, uv__work_done);
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int err;
// 给libuv注册一个观察者io,读端
err = uv__async_start(loop);
if (err)
return err;
// 设置相关字段,给libuv插入一个async_handle,写端
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
handle->async_cb = async_cb;
handle->pending = 0;
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
uv__handle_start(handle);
return 0;
}
libuv的实现感觉有点乱。uv_async_init函数主要做了三件事情
- 通过eventfd或者管道生成线程间通信的两个fd(读端和写端);
- 执行uv__async_start,封装一个io观察者(包括读端fd和回调),然后追加到watcher_queue队列,在poll io阶段,libuv会注册到epoll里面。
- 执行QUEUE_INSERT_TAIL给libuv的async_handles队列追加一个handle(写端,线程池的线程完成任务后会使用写端写入数据,通知主线程)
下面我们看一下1,2两点的实现。
获取通信描述符、注册读端,保存写端描述符
static int uv__async_start(uv_loop_t* loop) {
int pipefd[2];
int err;
// 只需要初始化一次
if (loop->async_io_watcher.fd != -1)
return 0;
// 获取一个用于进程间通信的fd
err = uv__async_eventfd();
// 成功则保存起来,不支持则使用管道通信作为进程间通信
if (err >= 0) {
pipefd[0] = err;
pipefd[1] = -1;
}
else if (err == UV_ENOSYS) {
// 不支持eventfd则使用匿名管道
err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
#if defined(__linux__)
/* Save a file descriptor by opening one of the pipe descriptors as
* read/write through the procfs. That file descriptor can then
* function as both ends of the pipe.
*/
if (err == 0) {
char buf[32];
int fd;
snprintf(buf, sizeof(buf), "/proc/self/fd/%d", pipefd[0]);
// 通过fd就可以实现对管道的读写,高级用法
fd = uv__open_cloexec(buf, O_RDWR);
if (fd >= 0) {
// 关掉旧的
uv__close(pipefd[0]);
uv__close(pipefd[1]);
// 赋值新的
pipefd[0] = fd;
pipefd[1] = fd;
}
}
#endif
}
// 拿到了通信的读写两端
if (err < 0)
return err;
// 初始化io观察者async_io_watcher
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
// 注册io观察者到loop里,并注册需要监听的事件POLLIN,即可读
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
我们接着看uv__io_init
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
// 初始化队列,回调,需要监听的fd
QUEUE_INIT(&w->pending_queue);
QUEUE_INIT(&w->watcher_queue);
w->cb = cb;
w->fd = fd;
w->events = 0;
w->pevents = 0;
}
代码很简单,就是设置一下async_io_watcher的fd和回调,在epoll_wait返回的时候用到。再看uv__io_start。
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
// 记录当前的events,用于下次比较
w->pevents |= events;
maybe_resize(loop, w->fd + 1);
#if !defined(__sun)
/* The event ports backend needs to rearm all file descriptors on each and
* every tick of the event loop but the other backends allow us to
* short-circuit here if the event mask is unchanged.
*/
// event没变,则不需要再次处理
if (w->events == w->pevents)
return;
#endif
// 如果队列为空则把w挂载到watcher_queue队列中
if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
// 保存映射关系,poll io中需要用
if (loop->watchers[w->fd] == NULL) {
loop->watchers[w->fd] = w;
loop->nfds++;
}
}
uv__io_start主要是把loop->async_io_watcher插入watcher_queue队列,poll io阶段会把watcher_queue队列的节点逐个加到epoll中去。至此,完成了写端的设置。即拿到了通信的两个描述符,在loop里设置了读端感兴趣的回调和fd。我们回到uv__async_start函数,看到倒数第二句。
loop->async_wfd = pipefd[1];
这里保存了通信的写端描述符。
设置写端
// 设置相关字段,给libuv插入一个async_handle,写端
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
handle->async_cb = async_cb;
// 标记是否有任务完成了
handle->pending = 0;
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
uv__handle_start(handle);
在1,2两点中,libuv在uv__async_start中通过loop->async_wfd = pipefd[1];保存了写端描述符。这里只需要把写端对应的handle插入到loop里就行。通过一系列操作。有点懵。总结一下。几个重要的字段。
- 首先获得两个通信描述符,读端保存在loop->async_io_watcher结构体里,写端保存在loop->async_wfd 字段;
- 然后挂载loop->async_io_watcher到loop->watcher_queue队列,主要为了在poll io中注册读事件,读事件回调是uv__async_io。
- 把loop->wq_async结构体插入loop->async_handles队列。loop->wq_async是写端相关的handle,线程池会设置这个handle的pending为1表示有任务已经完成,然后再往管道写端写入标记,主线程在epoll_wait的时候返回这个fd,并指向对应的回调(uv__async_io)。uv__async_io会遍历async_handles队列,pending等于1的话会执行对应的回调。对于wq_async节点,回调是uv__work_done。
就这样完成了线程池和主线程的通信。下面我们看看使用的例子。 这里以文件操作为例子,因为nodejs中文件读写是以线程池实现的。这里直接从uv_fs_open开始(因为js层到c++层主要是一些封装。最后会调到uv_fs_open)。直接看一下uv_fs_open的代码。
// 下面代码是宏展开后的效果
int uv_fs_open(uv_loop_t* loop,
uv_fs_t* req,
const char* path,
int flags,
int mode,
uv_fs_cb cb) {
do {
if (req == NULL)
return UV_EINVAL;
UV_REQ_INIT(req, UV_FS);
req->fs_type = UV_FS_ ## subtype;
req->result = 0;
req->ptr = NULL;
req->loop = loop;
req->path = NULL;
req->new_path = NULL;
req->bufs = NULL;
req->cb = cb;
}
while (0)
do {
assert(path != NULL);
if (cb == NULL) {
req->path = path;
} else {
req->path = uv__strdup(path);
if (req->path == NULL)
return UV_ENOMEM;
}
}
while (0)
req->flags = flags;
req->mode = mode;
do {
if (cb != NULL) {
uv__req_register(loop, req);
/* 异步*/
uv__work_submit(loop,
&req->work_req,
UV__WORK_FAST_IO,
**uv__fs_work**,
**uv__fs_done**);
return 0;
}
else {
/* 同步 */
uv__fs_work(&req->work_req);
return req->result;
}
}
while (0)
里面很多do while,是因为他们是宏展开而来。我们从上往下看,没有太多的逻辑,函数的最后一个参数cb是nodejs的c++层设置的,c++层会再回调js层。然后open(大部分的文件操作)分为同步和异步两种模式(即fs.open和openSync)。同步直接导致nodejs阻塞,不涉及到线程池,这里只看异步模式。所以我们从uv__work_submit函数开始看。
// 给线程池提交一个任务
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w, int status)) {
// 保证已经初始化线程,并只执行一次
uv_once(&once, init_once);
w->loop = loop;
// 可能引起阻塞的函数
w->work = work;
// work执行完后执行的回调
w->done = done;
post(&w->wq, kind);
}
submit函数主要是设置了一些字段,然后调post函数。接着看post函数。
// 把任务插入队列等待线程处理
static void post(QUEUE* q, enum uv__work_kind kind) {
uv_mutex_lock(&mutex);
// 类型是慢IO
if (kind == UV__WORK_SLOW_IO) {
/* Insert into a separate queue. */
// 插入慢IO对应的队列
QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
/*
有慢IO任务的时候,需要给主队列wq插入一个消息节点run_slow_work_message,
说明有慢IO任务,所以如果run_slow_work_message是空,说明还没有插入主队列。
需要进行q = &run_slow_work_message;赋值,然后把run_slow_work_message插入
主队列
*/
if (!QUEUE_EMPTY(&run_slow_work_message)) {
/* Running slow I/O tasks is already scheduled => Nothing to do here.
The worker that runs said other task will schedule this one as well. */
uv_mutex_unlock(&mutex);
return;
}
q = &run_slow_work_message;
}
// 把节点插入主队列,可能是慢IO消息节点或者一般任务
QUEUE_INSERT_TAIL(&wq, q);
// 有空闲线程则唤醒他
if (idle_threads > 0)
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
}
post函数以互斥的方式给线程池的队列加一个任务节点。任务提交成功。然后线程池里的线程就会不断地从任务队列了执行任务。这里提一下线程池的初始化。
static void init_threads(void) {
// 创建多个线程
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
...
}
所以worker函数是处理任务的。下面是work函数的主要逻辑。
// q是慢IO或者一般任务
w = QUEUE_DATA(q, struct uv__work, wq);
// 执行业务回调,该函数一般会阻塞
w->work(w);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
// 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
// 通知loop的wq_async节点
uv_async_send(&w->loop->wq_async)
w->work(w)就是执行真正任务的地方。从一开始的uv_fs_open函数那里,我们知道work是uv__fs_work函数。该函数就会打开一个文件(类似地,读一个文件,写一个文件),操作完成后会把关联的结构体w加到w->loop->wq中,wq的作用我们一会会看到。最后执行uv_async_send(&w->loop->wq_async)通知主线程。w->loop->wq_async就是我们前面说到的写端对应的handle。我们看看uv_async_send做了什么。
int uv_async_send(uv_async_t* handle) {
/* Do a cheap read first. */
if (ACCESS_ONCE(int, handle->pending) != 0)
return 0;
// 如pending是0,则设置为1,返回0,如果是1则返回1,所以如果多次调用该函数是会被合并的
if (cmpxchgi(&handle->pending, 0, 1) == 0)
uv__async_send(handle->loop);
return 0;
}
static void uv__async_send(uv_loop_t* loop) {
const void* buf;
ssize_t len;
int fd;
int r;
buf = "";
len = 1;
fd = loop->async_wfd;
#if defined(__linux__)
// 说明用的是eventfd而不是管道
if (fd == -1) {
static const uint64_t val = 1;
buf = &val;
len = sizeof(val);
// 见uv__async_start
fd = loop->async_io_watcher.fd; /* eventfd */
}
#endif
// 通知读端
do
r = write(fd, buf, len);
while (r == -1 && errno == EINTR);
if (r == len)
return;
if (r == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
abort();
}
重点是write函数,这个fd就是我们前面讲到的管道的写端。此时,往管道的写端写入数据。对于一个任务,线程池的工作就完成了。有写则必然有读。读的逻辑是在uv__io_poll中实现的。uv__io_poll函数即libuv中poll io阶段执行的函数。具体逻辑可以看这篇文章nodejs源码解析之事件循环。 在uv__io_poll中会发现管道可读,然后执行对应的回调,前面我们分析过,回调函数是uv__async_io。那我们去看看这个函数的主要逻辑。
// 把async_handles队列里的所有节点都移到queue变量中
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
// 逐个取出节点
q = QUEUE_HEAD(&queue);
// 根据结构体字段获取结构体首地址
h = QUEUE_DATA(q, uv_async_t, queue);
// 从队列中移除该节点
QUEUE_REMOVE(q);
// 重新插入async_handles队列,等待下次事件
QUEUE_INSERT_TAIL(&loop->async_handles, q);
/*
将第一个参数和第二个参数进行比较,如果相等,
则将第三参数写入第一个参数,返回第二个参数的值,
如果不相等,则返回第一个参数的值。
*/
//判断哪些async被触发了。pending在uv_async_send里设置成1,如果pending等于1,则清0,返回1.如果pending等于0,则返回0
if (cmpxchgi(&h->pending, 1, 0) == 0)
continue;
if (h->async_cb == NULL)
continue;
// 执行上层回调
h->async_cb(h);
}
取出loop->async_handles队列的节点,判断pending字段是否为1,为1则执行回调。刚才在uv_async_send的时候,我们看到已经设置pending字段为1了。所以这时候执行回调,回调函数是uv__work_done。继续往下看uv__work_done的主要逻辑。
// 把loop->wq队列的节点全部移到wp变量中,wq的队列在线程处理函数work里进行设置
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
// 执行回调
w->done(w, err);
}
前面有提过线程池的线程完成任务后会把对应的结构体插入wq队列,所以wq队列里的节点就是任务已经完成的了(执行uv__work_cancel取消任务也会往这个队列里插入节点,可以理解取消也是任务完成的一种状态)。这里把wq队列的节点取出来,执行done函数。done函数是在调用uv__work_submit向线程池提交任务时设置的,对于uv_fs_open是 uv__fs_done。
static void uv__fs_done(struct uv__work* w, int status) {
uv_fs_t* req;
req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);
if (status == UV_ECANCELED) {
assert(req->result == 0);
req->result = UV_ECANCELED;
}
req->cb(req);
}
没有太多逻辑,直接执行回调,顺便提一下,nodejs里则是执行c++层函数AfterInteger(代码在node_file.cc的Open函数)。
void AfterInteger(uv_fs_t* req) {
FSReqWrap* req_wrap = static_cast<FSReqWrap*>(req->data);
FSReqAfterScope after(req_wrap, req);
if (after.Proceed())
req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
}
void FSReqWrap::Resolve(Local<Value> value) {
Local<Value> argv[2] {
Null(env()->isolate()),
value
};
MakeCallback(env()->oncomplete_string(), arraysize(argv), argv);
}
执行resolve,然后执行js层的oncomplete回调,即用户执行open函数时传入的函数。