libuv线程池和主线程通信原理
发布于 9 小时前 作者 theanarkh 228 次浏览 来自 分享

代码仓库:https://github.com/theanarkh/read-libuv-code

一切要从libuv的初始化开始。

uv_default_loop();

该函数调用

uv_loop_init();

进行初始化。uv_loop_init有以下代码。

uv_async_init(loop, &loop->wq_async, uv__work_done);

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  int err;
  // 给libuv注册一个观察者io,读端
  err = uv__async_start(loop);
  if (err)
    return err;
  // 设置相关字段,给libuv插入一个async_handle,写端
  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  handle->pending = 0;

  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

  return 0;
}

libuv的实现感觉有点乱。uv_async_init函数主要做了三件事情

  1. 通过eventfd或者管道生成线程间通信的两个fd(读端和写端);
  2. 执行uv__async_start,封装一个io观察者(包括读端fd和回调),然后追加到watcher_queue队列,在poll io阶段,libuv会注册到epoll里面。
  3. 执行QUEUE_INSERT_TAIL给libuv的async_handles队列追加一个handle(写端,线程池的线程完成任务后会使用写端写入数据,通知主线程)

下面我们看一下1,2两点的实现。

获取通信描述符、注册读端,保存写端描述符

static int uv__async_start(uv_loop_t* loop) {
  int pipefd[2];
  int err;
  // 只需要初始化一次
  if (loop->async_io_watcher.fd != -1)
    return 0;
  // 获取一个用于进程间通信的fd
  err = uv__async_eventfd();
  // 成功则保存起来,不支持则使用管道通信作为进程间通信
  if (err >= 0) {
    pipefd[0] = err;
    pipefd[1] = -1;
  }
  else if (err == UV_ENOSYS) {
  	// 不支持eventfd则使用匿名管道
    err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
#if defined(__linux__)
    /* Save a file descriptor by opening one of the pipe descriptors as
     * read/write through the procfs.  That file descriptor can then
     * function as both ends of the pipe.
     */
    if (err == 0) {
      char buf[32];
      int fd;

      snprintf(buf, sizeof(buf), "/proc/self/fd/%d", pipefd[0]);
      // 通过fd就可以实现对管道的读写,高级用法
      fd = uv__open_cloexec(buf, O_RDWR);
      if (fd >= 0) {
        // 关掉旧的
        uv__close(pipefd[0]);
        uv__close(pipefd[1]);
        // 赋值新的
        pipefd[0] = fd;
        pipefd[1] = fd;
      }
    }
#endif
  }
  // 拿到了通信的读写两端
  if (err < 0)
    return err;
  // 初始化io观察者async_io_watcher
  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  // 注册io观察者到loop里,并注册需要监听的事件POLLIN,即可读
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}

我们接着看uv__io_init

void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
  // 初始化队列,回调,需要监听的fd
  QUEUE_INIT(&w->pending_queue);
  QUEUE_INIT(&w->watcher_queue);
  w->cb = cb;
  w->fd = fd;
  w->events = 0;
  w->pevents = 0;
}

代码很简单,就是设置一下async_io_watcher的fd和回调,在epoll_wait返回的时候用到。再看uv__io_start。

void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  // 记录当前的events,用于下次比较
  w->pevents |= events;
  maybe_resize(loop, w->fd + 1);

#if !defined(__sun)
  /* The event ports backend needs to rearm all file descriptors on each and
   * every tick of the event loop but the other backends allow us to
   * short-circuit here if the event mask is unchanged.
   */
  // event没变,则不需要再次处理
  if (w->events == w->pevents)
    return;
#endif
  // 如果队列为空则把w挂载到watcher_queue队列中
  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
  // 保存映射关系,poll io中需要用
  if (loop->watchers[w->fd] == NULL) {
    loop->watchers[w->fd] = w;
    loop->nfds++;
  }
}

uv__io_start主要是把loop->async_io_watcher插入watcher_queue队列,poll io阶段会把watcher_queue队列的节点逐个加到epoll中去。至此,完成了写端的设置。即拿到了通信的两个描述符,在loop里设置了读端感兴趣的回调和fd。我们回到uv__async_start函数,看到倒数第二句。

loop->async_wfd = pipefd[1];

这里保存了通信的写端描述符。

设置写端

 // 设置相关字段,给libuv插入一个async_handle,写端
  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  // 标记是否有任务完成了
  handle->pending = 0;

  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

在1,2两点中,libuv在uv__async_start中通过loop->async_wfd = pipefd[1];保存了写端描述符。这里只需要把写端对应的handle插入到loop里就行。通过一系列操作。有点懵。总结一下。几个重要的字段。

  1. 首先获得两个通信描述符,读端保存在loop->async_io_watcher结构体里,写端保存在loop->async_wfd 字段;
  2. 然后挂载loop->async_io_watcherloop->watcher_queue队列,主要为了在poll io中注册读事件,读事件回调是uv__async_io。
  3. loop->wq_async结构体插入loop->async_handles队列。loop->wq_async是写端相关的handle,线程池会设置这个handle的pending为1表示有任务已经完成,然后再往管道写端写入标记,主线程在epoll_wait的时候返回这个fd,并指向对应的回调(uv__async_io)。uv__async_io会遍历async_handles队列,pending等于1的话会执行对应的回调。对于wq_async节点,回调是uv__work_done

就这样完成了线程池和主线程的通信。下面我们看看使用的例子。 这里以文件操作为例子,因为nodejs中文件读写是以线程池实现的。这里直接从uv_fs_open开始(因为js层到c++层主要是一些封装。最后会调到uv_fs_open)。直接看一下uv_fs_open的代码。

// 下面代码是宏展开后的效果
int uv_fs_open(uv_loop_t* loop,
               uv_fs_t* req,
               const char* path,
               int flags,
               int mode,
               uv_fs_cb cb) {
 do {                                                                        
    if (req == NULL)                                                          
      return UV_EINVAL;                                                       
    UV_REQ_INIT(req, UV_FS);                                                  
    req->fs_type = UV_FS_ ## subtype;                                         
    req->result = 0;                                                          
    req->ptr = NULL;                                                          
    req->loop = loop;                                                         
    req->path = NULL;                                                         
    req->new_path = NULL;                                                     
    req->bufs = NULL;                                                         
    req->cb = cb;                                                             
  }                                                                           
  while (0)
  do {                                                                       
    assert(path != NULL);                                                    
    if (cb == NULL) {                                                         
      req->path = path;                                                       
    } else {                                                                  
      req->path = uv__strdup(path);                                           
      if (req->path == NULL)                                                  
        return UV_ENOMEM;                                                     
    }                                                                         
  }                                                                           
  while (0)
  req->flags = flags;
  req->mode = mode;
   do {                                                                        
    if (cb != NULL) {                                                         
      uv__req_register(loop, req);  
      /* 异步*/                                           
      uv__work_submit(loop,                                                   
                      &req->work_req,                                         
                      UV__WORK_FAST_IO,                                       
                      **uv__fs_work**,                                            
                      **uv__fs_done**);                                           
      return 0;                                                               
    }                                                                         
    else {                 
       /* 同步 */                                                    
      uv__fs_work(&req->work_req);                                            
      return req->result;                                                     
    }                                                                         
  }                                                                           
  while (0)

里面很多do while,是因为他们是宏展开而来。我们从上往下看,没有太多的逻辑,函数的最后一个参数cb是nodejs的c++层设置的,c++层会再回调js层。然后open(大部分的文件操作)分为同步和异步两种模式(即fs.open和openSync)。同步直接导致nodejs阻塞,不涉及到线程池,这里只看异步模式。所以我们从uv__work_submit函数开始看。

// 给线程池提交一个任务
void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  // 保证已经初始化线程,并只执行一次
  uv_once(&once, init_once);
  w->loop = loop;
  // 可能引起阻塞的函数
  w->work = work;
  // work执行完后执行的回调
  w->done = done;
  post(&w->wq, kind);
}

submit函数主要是设置了一些字段,然后调post函数。接着看post函数。

// 把任务插入队列等待线程处理
static void post(QUEUE* q, enum uv__work_kind kind) {
  uv_mutex_lock(&mutex);
  // 类型是慢IO
  if (kind == UV__WORK_SLOW_IO) {
    /* Insert into a separate queue. */
    // 插入慢IO对应的队列
    QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
    /*
      有慢IO任务的时候,需要给主队列wq插入一个消息节点run_slow_work_message,
      说明有慢IO任务,所以如果run_slow_work_message是空,说明还没有插入主队列。
      需要进行q = &run_slow_work_message;赋值,然后把run_slow_work_message插入
      主队列 
    */
    if (!QUEUE_EMPTY(&run_slow_work_message)) {
      /* Running slow I/O tasks is already scheduled => Nothing to do here.
         The worker that runs said other task will schedule this one as well. */
      uv_mutex_unlock(&mutex);
      return;
    }
    q = &run_slow_work_message;
  }
  // 把节点插入主队列,可能是慢IO消息节点或者一般任务
  QUEUE_INSERT_TAIL(&wq, q);
  // 有空闲线程则唤醒他
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}

post函数以互斥的方式给线程池的队列加一个任务节点。任务提交成功。然后线程池里的线程就会不断地从任务队列了执行任务。这里提一下线程池的初始化。

static void init_threads(void) {
	// 创建多个线程
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
    ...
}

所以worker函数是处理任务的。下面是work函数的主要逻辑。

 // q是慢IO或者一般任务
    w = QUEUE_DATA(q, struct uv__work, wq);
    // 执行业务回调,该函数一般会阻塞
    w->work(w);

    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    // 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    // 通知loop的wq_async节点
    uv_async_send(&w->loop->wq_async)

w->work(w)就是执行真正任务的地方。从一开始的uv_fs_open函数那里,我们知道work是uv__fs_work函数。该函数就会打开一个文件(类似地,读一个文件,写一个文件),操作完成后会把关联的结构体w加到w->loop->wq中,wq的作用我们一会会看到。最后执行uv_async_send(&w->loop->wq_async)通知主线程。w->loop->wq_async就是我们前面说到的写端对应的handle。我们看看uv_async_send做了什么。


int uv_async_send(uv_async_t* handle) {
  /* Do a cheap read first. */
  if (ACCESS_ONCE(int, handle->pending) != 0)
    return 0;
  // 如pending是0,则设置为1,返回0,如果是1则返回1,所以如果多次调用该函数是会被合并的
  if (cmpxchgi(&handle->pending, 0, 1) == 0)
    uv__async_send(handle->loop);

  return 0;
}

static void uv__async_send(uv_loop_t* loop) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = loop->async_wfd;

#if defined(__linux__)
  // 说明用的是eventfd而不是管道
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    // 见uv__async_start
    fd = loop->async_io_watcher.fd;  /* eventfd */
  }
#endif
  // 通知读端
  do
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}

重点是write函数,这个fd就是我们前面讲到的管道的写端。此时,往管道的写端写入数据。对于一个任务,线程池的工作就完成了。有写则必然有读。读的逻辑是在uv__io_poll中实现的。uv__io_poll函数即libuv中poll io阶段执行的函数。具体逻辑可以看这篇文章nodejs源码解析之事件循环。 在uv__io_poll中会发现管道可读,然后执行对应的回调,前面我们分析过,回调函数是uv__async_io。那我们去看看这个函数的主要逻辑。

 // 把async_handles队列里的所有节点都移到queue变量中
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    // 逐个取出节点
    q = QUEUE_HEAD(&queue);
    // 根据结构体字段获取结构体首地址
    h = QUEUE_DATA(q, uv_async_t, queue);
    // 从队列中移除该节点
    QUEUE_REMOVE(q);
    // 重新插入async_handles队列,等待下次事件
    QUEUE_INSERT_TAIL(&loop->async_handles, q);
    /*
      将第一个参数和第二个参数进行比较,如果相等,
      则将第三参数写入第一个参数,返回第二个参数的值,
      如果不相等,则返回第一个参数的值。
    */
    //判断哪些async被触发了。pending在uv_async_send里设置成1,如果pending等于1,则清0,返回1.如果pending等于0,则返回0
    if (cmpxchgi(&h->pending, 1, 0) == 0)
      continue;

    if (h->async_cb == NULL)
      continue;
    // 执行上层回调
    h->async_cb(h);
  }

取出loop->async_handles队列的节点,判断pending字段是否为1,为1则执行回调。刚才在uv_async_send的时候,我们看到已经设置pending字段为1了。所以这时候执行回调,回调函数是uv__work_done。继续往下看uv__work_done的主要逻辑。

// 把loop->wq队列的节点全部移到wp变量中,wq的队列在线程处理函数work里进行设置
  QUEUE_MOVE(&loop->wq, &wq);
  uv_mutex_unlock(&loop->wq_mutex);

  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    // 执行回调
    w->done(w, err);
  }

前面有提过线程池的线程完成任务后会把对应的结构体插入wq队列,所以wq队列里的节点就是任务已经完成的了(执行uv__work_cancel取消任务也会往这个队列里插入节点,可以理解取消也是任务完成的一种状态)。这里把wq队列的节点取出来,执行done函数。done函数是在调用uv__work_submit向线程池提交任务时设置的,对于uv_fs_open是 uv__fs_done。


static void uv__fs_done(struct uv__work* w, int status) {
  uv_fs_t* req;

  req = container_of(w, uv_fs_t, work_req);
  uv__req_unregister(req->loop, req);

  if (status == UV_ECANCELED) {
    assert(req->result == 0);
    req->result = UV_ECANCELED;
  }
  req->cb(req);
}

没有太多逻辑,直接执行回调,顺便提一下,nodejs里则是执行c++层函数AfterInteger(代码在node_file.cc的Open函数)。

void AfterInteger(uv_fs_t* req) {
  FSReqWrap* req_wrap = static_cast<FSReqWrap*>(req->data);
  FSReqAfterScope after(req_wrap, req);

  if (after.Proceed())
    req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
}

void FSReqWrap::Resolve(Local<Value> value) {
  Local<Value> argv[2] {
    Null(env()->isolate()),
    value
  };
  MakeCallback(env()->oncomplete_string(), arraysize(argv), argv);
}

执行resolve,然后执行js层的oncomplete回调,即用户执行open函数时传入的函数。

回到顶部