libuv的async.c实现了线程和主线程的通信。在uv_loop_init函数中对async进行初始化。
uv_async_init(loop, &loop->wq_async, uv__work_done);
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int err;
err = uv__async_start(loop);
if (err)
return err;
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
handle->async_cb = async_cb;
handle->pending = 0;
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
uv__handle_start(handle);
return 0;
}
初始化代码中uv__async_start函数是重点。该函数为异步通信创造了条件。
static int uv__async_start(uv_loop_t* loop) {
int pipefd[2];
int err;
if (loop->async_io_watcher.fd != -1)
return 0;
// 获取一个用于进程间通信的fd
err = uv__async_eventfd();
// 成功则保存起来,不支持则使用管道通信作为进程间通信
if (err >= 0) {
pipefd[0] = err;
pipefd[1] = -1;
}
else if (err == UV_ENOSYS) {
err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
#if defined(__linux__)
/* Save a file descriptor by opening one of the pipe descriptors as
* read/write through the procfs. That file descriptor can then
* function as both ends of the pipe.
*/
if (err == 0) {
char buf[32];
int fd;
snprintf(buf, sizeof(buf), "/proc/self/fd/%d", pipefd[0]);
fd = uv__open_cloexec(buf, O_RDWR);
if (fd >= 0) {
uv__close(pipefd[0]);
uv__close(pipefd[1]);
pipefd[0] = fd;
pipefd[1] = fd;
}
}
#endif
}
if (err < 0)
return err;
// 初始化io观察者async_io_watcher
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
// 注册io观察者到loop里,并注册需要监听的事件POLLIN,读
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
assert(cb != NULL);
assert(fd >= -1);
// 初始化队列,回调,需要监听的fd
QUEUE_INIT(&w->pending_queue);
QUEUE_INIT(&w->watcher_queue);
w->cb = cb;
w->fd = fd;
w->events = 0;
w->pevents = 0;
#if defined(UV_HAVE_KQUEUE)
w->rcount = 0;
w->wcount = 0;
#endif /* defined(UV_HAVE_KQUEUE) */
}
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(0 == (events & ~(POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI)));
assert(0 != events);
assert(w->fd >= 0);
assert(w->fd < INT_MAX);
w->pevents |= events;
maybe_resize(loop, w->fd + 1);
#if !defined(__sun)
/* The event ports backend needs to rearm all file descriptors on each and
* every tick of the event loop but the other backends allow us to
* short-circuit here if the event mask is unchanged.
*/
if (w->events == w->pevents)
return;
#endif
// 如果队列为空则把w挂载到watcher_queue的watch_queue
if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
// 保存映射关系
if (loop->watchers[w->fd] == NULL) {
loop->watchers[w->fd] = w;
loop->nfds++;
}
}
该函数新建了用于异步通信的文件描述符,然后把文件描述符和回调函数封装成一个io观察者。然后设置了感兴趣的事件。后续用epoll_wait进行监听,等待别的线程写数据。经过一系列操作后,内存视图如下。 uv__async_start函数有一个需求注意的地方是
if (loop->async_io_watcher.fd != -1)
return 0;
当第二次执行该函数时,会直接返回,观察者对应的文件描述符只有一个。假设我们有以下代码。
int main() {
loop = uv_default_loop();
uv_work_t req;
int size = 10240;
req.data = (void*) &size;
uv_async_init(loop, &async, print_progress);
uv_queue_work(loop, &req, fake_download, after);
return uv_run(loop, UV_RUN_DEFAULT);
}
我们新建了一个async放到loop里面,这时候不会增加一个IO观察者,只是增加了一个async节点,内存视图如下。 以上就完成了async的初始化和注册过程。接下来就是在poll IO阶段,有epoll_wait取监听我们注册的文件描述符,然后执行回调。由上面代码可知,回调函数是uv__async_io。
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE* q;
uv_async_t* h;
assert(w == &loop->async_io_watcher);
for (;;) {
// 判断通信内容
r = read(w->fd, buf, sizeof(buf));
if (r == sizeof(buf))
continue;
if (r != -1)
break;
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
abort();
}
// 把async_handles队列里的所有节点都移到queue变量中
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
// 逐个取出节点
q = QUEUE_HEAD(&queue);
// 根据结构体字段获取结构体首地址
h = QUEUE_DATA(q, uv_async_t, queue);
// 从队列中移除该节点
QUEUE_REMOVE(q);
// 重新插入async_handles队列,等待下次事件
QUEUE_INSERT_TAIL(&loop->async_handles, q);
/*
将第一个参数和第二个参数进行比较,如果相等,
则将第三参数写入第一个参数,返回第二个参数的值,
如果不相等,则返回第一个参数的值。
*/
//判断哪些async被触发了。pending在uv_async_send里设置成1,如果pending等于1,则清0,返回1.如果pending等于0,则返回0
if (cmpxchgi(&h->pending, 1, 0) == 0)
continue;
if (h->async_cb == NULL)
continue;
// 执行上层回调
h->async_cb(h);
}
}
uv__async_io变量变量loop的async_handles队列,通过pending字段判断该async是否被触发,pending字段可以有libuv设置,也可能由用户设置。如果被触发则执行上层回调,该回调在uv_async_init时传入,这里的回调分为两种,第一种是libuv本身的和用户的。如果async结构体是由用户创建的,则回调函数也是用户设置的。例如上面的print_progress函数。另外一种就是libuv本身创建的async。例如uv_loop_init时初始化的代码。
err = uv_async_init(loop, &loop->wq_async, uv__work_done);
对于这种情况,回调函数是libuv提供的uv__work_done。该async用于线程池的线程和主线程通信。每次线程执行完一个任务,就会通过这种方式通知主线程,然后执行uv__work_done回调,uv__work_done函数会继续执行用户设置的回调。这样就完成了异步通信。