libuv之async.c源码解析
发布于 5 天前 作者 theanarkh 1245 次浏览 来自 分享

libuv的async.c实现了线程和主线程的通信。在uv_loop_init函数中对async进行初始化。

 uv_async_init(loop, &loop->wq_async, uv__work_done);
 
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  int err;

  err = uv__async_start(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  handle->pending = 0;
  
  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

  return 0;
}

初始化代码中uv__async_start函数是重点。该函数为异步通信创造了条件。


static int uv__async_start(uv_loop_t* loop) {
  int pipefd[2];
  int err;

  if (loop->async_io_watcher.fd != -1)
    return 0;
  // 获取一个用于进程间通信的fd
  err = uv__async_eventfd();
  // 成功则保存起来,不支持则使用管道通信作为进程间通信
  if (err >= 0) {
    pipefd[0] = err;
    pipefd[1] = -1;
  }
  else if (err == UV_ENOSYS) {
    err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
#if defined(__linux__)
    /* Save a file descriptor by opening one of the pipe descriptors as
     * read/write through the procfs.  That file descriptor can then
     * function as both ends of the pipe.
     */
    if (err == 0) {
      char buf[32];
      int fd;

      snprintf(buf, sizeof(buf), "/proc/self/fd/%d", pipefd[0]);
      fd = uv__open_cloexec(buf, O_RDWR);
      if (fd >= 0) {
        uv__close(pipefd[0]);
        uv__close(pipefd[1]);
        pipefd[0] = fd;
        pipefd[1] = fd;
      }
    }
#endif
  }

  if (err < 0)
    return err;
  // 初始化io观察者async_io_watcher
  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  // 注册io观察者到loop里,并注册需要监听的事件POLLIN,读
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}


void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
  assert(cb != NULL);
  assert(fd >= -1);
  // 初始化队列,回调,需要监听的fd
  QUEUE_INIT(&w->pending_queue);
  QUEUE_INIT(&w->watcher_queue);
  w->cb = cb;
  w->fd = fd;
  w->events = 0;
  w->pevents = 0;

#if defined(UV_HAVE_KQUEUE)
  w->rcount = 0;
  w->wcount = 0;
#endif /* defined(UV_HAVE_KQUEUE) */
}


void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  assert(0 == (events & ~(POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI)));
  assert(0 != events);
  assert(w->fd >= 0);
  assert(w->fd < INT_MAX);

  w->pevents |= events;
  maybe_resize(loop, w->fd + 1);

#if !defined(__sun)
  /* The event ports backend needs to rearm all file descriptors on each and
   * every tick of the event loop but the other backends allow us to
   * short-circuit here if the event mask is unchanged.
   */
  if (w->events == w->pevents)
    return;
#endif
  // 如果队列为空则把w挂载到watcher_queue的watch_queue
  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
  // 保存映射关系
  if (loop->watchers[w->fd] == NULL) {
    loop->watchers[w->fd] = w;
    loop->nfds++;
  }
}

该函数新建了用于异步通信的文件描述符,然后把文件描述符和回调函数封装成一个io观察者。然后设置了感兴趣的事件。后续用epoll_wait进行监听,等待别的线程写数据。经过一系列操作后,内存视图如下。 在这里插入图片描述 uv__async_start函数有一个需求注意的地方是

if (loop->async_io_watcher.fd != -1)
    return 0;

当第二次执行该函数时,会直接返回,观察者对应的文件描述符只有一个。假设我们有以下代码。


int main() {
    loop = uv_default_loop();

    uv_work_t req;
    int size = 10240;
    req.data = (void*) &size;

    uv_async_init(loop, &async, print_progress);
    uv_queue_work(loop, &req, fake_download, after);

    return uv_run(loop, UV_RUN_DEFAULT);
}

我们新建了一个async放到loop里面,这时候不会增加一个IO观察者,只是增加了一个async节点,内存视图如下。 在这里插入图片描述 以上就完成了async的初始化和注册过程。接下来就是在poll IO阶段,有epoll_wait取监听我们注册的文件描述符,然后执行回调。由上面代码可知,回调函数是uv__async_io。

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  char buf[1024];
  ssize_t r;
  QUEUE queue;
  QUEUE* q;
  uv_async_t* h;

  assert(w == &loop->async_io_watcher);

  for (;;) {
    // 判断通信内容
    r = read(w->fd, buf, sizeof(buf));

    if (r == sizeof(buf))
      continue;

    if (r != -1)
      break;

    if (errno == EAGAIN || errno == EWOULDBLOCK)
      break;

    if (errno == EINTR)
      continue;

    abort();
  }
  // 把async_handles队列里的所有节点都移到queue变量中
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    // 逐个取出节点
    q = QUEUE_HEAD(&queue);
    // 根据结构体字段获取结构体首地址
    h = QUEUE_DATA(q, uv_async_t, queue);
    // 从队列中移除该节点
    QUEUE_REMOVE(q);
    // 重新插入async_handles队列,等待下次事件
    QUEUE_INSERT_TAIL(&loop->async_handles, q);
    /*
      将第一个参数和第二个参数进行比较,如果相等,
      则将第三参数写入第一个参数,返回第二个参数的值,
      如果不相等,则返回第一个参数的值。
    */
    //判断哪些async被触发了。pending在uv_async_send里设置成1,如果pending等于1,则清0,返回1.如果pending等于0,则返回0
    if (cmpxchgi(&h->pending, 1, 0) == 0)
      continue;

    if (h->async_cb == NULL)
      continue;
    // 执行上层回调
    h->async_cb(h);
  }
}

uv__async_io变量变量loop的async_handles队列,通过pending字段判断该async是否被触发,pending字段可以有libuv设置,也可能由用户设置。如果被触发则执行上层回调,该回调在uv_async_init时传入,这里的回调分为两种,第一种是libuv本身的和用户的。如果async结构体是由用户创建的,则回调函数也是用户设置的。例如上面的print_progress函数。另外一种就是libuv本身创建的async。例如uv_loop_init时初始化的代码。

err = uv_async_init(loop, &loop->wq_async, uv__work_done);

对于这种情况,回调函数是libuv提供的uv__work_done。该async用于线程池的线程和主线程通信。每次线程执行完一个任务,就会通过这种方式通知主线程,然后执行uv__work_done回调,uv__work_done函数会继续执行用户设置的回调。这样就完成了异步通信。

回到顶部