Libuv学习——线程池
发布于 2 天前 作者 wanglei20116527 187 次浏览 来自 分享

1572258994631-6b1c3f68-eb01-4231-9852-034702388c13.png

前言

通过前两篇文章的学习,我们已经解了Libuv中的队列线程基础,为本文的学习打下基础,没有看过的同学建议先看下。下面将从生产者消费者模型和源码两个角度学习Libuv的线程池,为后面学习Libuv文件处理做铺垫。

生产者消费者模型

Node.js的文件操作支持同步调用和异步调用,根据Libuv官网的介绍,我们知道它没有跨平台的异步文件IO可以使用,所以它的异步文件IO是通过在线程池中执行同步文件IO实现的。那具体是怎么实现的呢?答案就是生产者消费者模型。Libuv的线程包括2部分,一个是主线程,一个是线程池。主线程的一部分工作是描述任务并将其提交给线程池,线程池进行处理。拿异步文件操作为例,主线程生成一个描述文件操作的对象,将其提交到任务队列;线程池从任务队列获取该对象进行处理。其中主线程是生产者,线程池中的线程是消费者,任务队列是生产者和消费者之间的桥梁,下面是一个简单的示意图:

1573392692932-48bf188f-5fcb-419f-8329-a686d281007e.png

Libuv在生产者消费者模型中多加了一步,线程池执行完任务后,将结果交给主线程,主线程拿到结果后,如果发现有回调函数需要执行,就执行。所以Libuv的线程模型如下:

1573392929503-0af9bc4a-8590-4348-bebd-4943dfd3ea20.png

源码分析

Libuv线程池的代码很容易找到,就在src目录下的threadpool.c文件中。

1573393406765-b459960c-fb44-4228-9f74-16f84b37c8f6.png

通过上面对生产者消费者模型的介绍,该代码大致分为4部分:任务队列、主线程提交任务到任务队列(提交任务)、线程池从任务队列获取任务并执行(消费任务)、线程池执行完任务通知主线程执行回调函数(回调处理)。

任务队列

任务队列就是一个队列而已。由于任务队列会被多个线程(主线程、线程池)同时访问,为了保证线程安全,需要互斥锁。另外任务队列如果为空,线程池中的线程需要挂起,等待主线程提交任务后唤起,所以还需要条件变量。任务队列、条件变量、互斥量的定义如下所示:

...
static uv_cond_t cond; // 条件变量
static uv_mutex_t mutex; // 互斥锁
...
static QUEUE wq; // 任务队列
...

提交任务

主线程将任务提交到任务队列是通过uv__work_submit来实现的,让我们来看下它的代码:

struct uv__work {
  void (*work)(struct uv__work *w);
  void (*done)(struct uv__work *w, int status);
  struct uv_loop_s* loop;
  void* wq[2]; // 用于将其关联到任务队列中
};

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     enum uv__work_kind kind,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  uv_once(&once, init_once); // 初始化线程,无乱调用多少次,init_once只会执行一次
  w->loop = loop; // 事件循环
  w->work = work; // 线程池要执行的函数
  w->done = done; // 线程池执行结束后,通知主线程要执行的函数
  post(&w->wq, kind); // 将任务提交任务队列中
}

uv__work_submit有4个参数:第一个参数为Libuv的事件循环,这里我们先忽略,以后会有专门的文章介绍;第二个参数是线程池执行任务的通用模型,类型为uv__work,属性work表示线程池中要执行的函数,属性done表示线程池执行完,通知主线程要执行的函数;第三、四个参数分别对应work函数和done函数。该函数主要做了两件事情:一件是通过uv_once调用init_once来初始化线程池;另一件是对w进行赋值,然后通过post将其提交到任务队列。这里需要注意,通过nv_once可以保证uv__work_submit在调用多次的情况,init_once只执行一次,nv_once底层是通过pthread_once实现的。init_once会在下一节介绍,让我们先来看下post。

static void post(QUEUE* q, enum uv__work_kind kind) {
  // 获取锁
  uv_mutex_lock(&mutex);
  ...
  // 将任务添加到任务队列的最后
  QUEUE_INSERT_TAIL(&wq, q);
  
  // 如果线程池中有挂起的线程,就唤起挂起的线程,让其工作
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  // 释放锁
  uv_mutex_unlock(&mutex);
}

代码很简单,先获取锁mutex,然后将任务提交到任务队列中。如果线程池中有挂起的线程,就通过条件变量cond唤起并放弃锁mutex。

消费任务

任务队列中的任务是通过线程池进行消费的,而线程池的初始化是在uv__work_submit调用init_once实现的,先看下如何初始化线程池吧:

static void init_once(void) {
  ...
  init_threads();
}

init_once调用了init_threads,那就看下init_threads。

...
#define MAX_THREADPOOL_SIZE 1024 // 线程池的最大数量
...
static uv_thread_t* threads; // 线程池
static uv_thread_t default_threads[4]; // 默认的线程池,线程数量为4
...


static void init_threads(void) {
  unsigned int i;
  const char* val;
  ...
      
  // 计算线程池中线程的数量,不能大于最大值MAX_THREADPOOL_SIZE
  nthreads = ARRAY_SIZE(default_threads);
    
  // 通过环境变量设置线程池的大小
  val = getenv("UV_THREADPOOL_SIZE");
  if (val != NULL)
    nthreads = atoi(val);
  
  // 保存线程池中最少有一个线程
  if (nthreads == 0)
    nthreads = 1;
    
  // 线程池中线程数量不能超过MAX_THREADPOOL_SIZE
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;
    
  // 初始化线程池
  threads = default_threads;
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }
    
  // 创建条件变量
  if (uv_cond_init(&cond))
    abort();
  
  // 创建互斥量
  if (uv_mutex_init(&mutex))
    abort();

  // 初始化任务队列
  QUEUE_INIT(&wq);
  ...
      
  // 根据线程池的数量,初始化线程池中的每个线程,并执行worker函数
  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();
  
  ...
}

通过上面的代码可以知道init_threads先获取线程池的大小nthreads;然后初始化互斥量mutex、条件变量cond和任务队列wq;最后创建nthreads个线程,每个线程执行worker函数。worker函数的核心就是消费任务队列中的任务,让我们详细的看下它:

static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;
  
  ...
  arg = NULL;
    
  // 获取互斥锁
  uv_mutex_lock(&mutex);
    
  // 通过无限循环,保证线程一直执行
  for (;;) {
    
    // 如果任务队列为空,通过等待条件变量cond挂起,并释放锁mutex
    // 主线程提交任务通过uv_cond_signal唤起,并重新获取锁mutex
    while (QUEUE_EMPTY(&wq) || ...) {
      idle_threads += 1;
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }
      
    // 从任务队列中获取第一个任务
    q = QUEUE_HEAD(&wq);
    ...
        
    // 将该任务从任务队列中删除
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);
      
    ...
        
    // 操作完任务队列,释放锁mutex
    uv_mutex_unlock(&mutex);

    // 获取uv__work对象,并执行work
    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    // 获取loop的互斥锁wq_mutex
    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;
    
    // 将执行完work函数的任务挂到loop->wq队列中
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
      
    // 通过uv_async_send通知主线程,当然有任务执行完了,主线程可以执行任务中的done函数。
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    // 获取锁,执行任务队列中的下一个任务
    ...
    uv_mutex_lock(&mutex);
    ...
  }
}

worker的本质就是从任务队列中获取任务,然后执行work函数。执行完后,将该任务提交到事件循环loop的wp队列中,通过uv_async_send告知主线程执行任务中的done函数。

回调处理

上面我们介绍了worker函数在执行完任务后会通过uv_async_send告知主线程执行回调函数,那这块是怎么实现的呢?这里涉及到了事件循环,这里我们就简单的介绍一下,后面会有详细的文章介绍它。事件循环loop在初始化的时候会调用uv_async_init,该函数的第三个参数是一个函数,当其他线程调用uv_async_send时,该函数就会执行。具体代码如下:

uv_async_init(loop, &loop->wq_async, uv__work_done);

void uv__work_done(uv_async_t* handle) {
  struct uv__work* w;
  uv_loop_t* loop;
  QUEUE* q;
  QUEUE wq;
  int err;

  loop = container_of(handle, uv_loop_t, wq_async);
  uv_mutex_lock(&loop->wq_mutex);
  QUEUE_MOVE(&loop->wq, &wq);
  uv_mutex_unlock(&loop->wq_mutex);

  while (!QUEUE_EMPTY(&wq)) {
    q = QUEUE_HEAD(&wq);
    QUEUE_REMOVE(q);

    w = container_of(q, struct uv__work, wq);
    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
    w->done(w, err);
  }
}

uv__work_done很简单,获取loop中的wq队列,获取队列中的每个任务并调用done函数。

总结

本文首先介绍了生产者消费者模型,然后通过任务队列、提交任务、消费任务、回调处理讲解了Libuv线程池,为下一篇讲解Libuv文件处理做铺垫,如果你对Libuv系列感兴趣的话,欢迎关注我们。

WechatIMG355.jpeg

回到顶部