简答梳理Node.js启动子进程的方法(上)
发布于 2 天前 作者 miser 990 次浏览 来自 分享

Node.js 创建子进程的方法常用的有如下几种:

child_process

  • exec: 衍生一个 shell 然后在该 shell 中执行 command,并缓冲任何产生的输出,最大缓存1024*1024 个字节。
  • execFile: 函数类似exec,但默认情况下不会衍生 shell。 相反,指定的可执行文件file 会作为新进程直接地衍生,使其比exec 稍微更高效。和exec一样,它也有最大1024*1204个字节的显示缓存。
  • fork: 是 spawn的一个特例,专门用于衍生新的 Node.js 进程。 与spawn一样返回ChildProcess对象。 返回的ChildProcess将会内置一个额外的通信通道,允许消息在父进程和子进程之间来回传递。
  • spawn: 上诉的几个方法其实都是通过spawn实现的。

cluster

  • fork: 衍生出一个新的工作进程,这只能通过主进程调用。

翻翻源码看看他们怎么实现的

源码版本和之前的libuv & Node.js EventLoop (一)一样

// libuv
#define UV_VERSION_MAJOR 1
#define UV_VERSION_MINOR 33
#define UV_VERSION_PATCH 1

// V8
#define V8_MAJOR_VERSION 7
#define V8_MINOR_VERSION 8
#define V8_BUILD_NUMBER 279
#define V8_PATCH_LEVEL 17

// Node.js
#define NODE_MAJOR_VERSION 14
#define NODE_MINOR_VERSION 0
#define NODE_PATCH_VERSION 0

child_process源码位置

Node 
  - lib 
    - internal 
      - child_process.js 
    - child_process.js;

lib/child_process.js文件中,定义了execexecFileforkspawn等方法,它们最后都会调用在lib/internal/child_process.js文件中的spawn方法。

exec

// lib/child_process.js
function exec(command, options, callback) {
  const opts = normalizeExecArgs(command, options, callback);
  return module.exports.execFile(opts.file, opts.options, opts.callback);
}
function normalizeExecArgs(command, options, callback) {
  if (typeof options === "function") {
    callback = options;
    options = undefined;
  }

  // Make a shallow copy so we don't clobber the user's options object.
  options = { ...options };
  options.shell = typeof options.shell === "string" ? options.shell : true;

  return {
    file: command,
    options: options,
    callback: callback,
  };
}

从上面发现 exec 其实就是封装了参数,主要是开启shell参数,然后调用 execFile 方法。

execFile

// lib/child_process.js
function execFile(file /* , args, options, callback */) {
  let args = [];
  let callback;
  let options;

  // ...

  options = {
    encoding: "utf8",
    timeout: 0,
    maxBuffer: MAX_BUFFER,
    killSignal: "SIGTERM",
    cwd: null,
    env: null,
    shell: false, // execFile 默认是不开启shell
    ...options,
  };

  // 通过spawn方法创建子进程
  const child = spawn(file, args, {
    cwd: options.cwd,
    env: options.env,
    gid: options.gid,
    uid: options.uid,
    shell: options.shell,
    windowsHide: !!options.windowsHide,
    windowsVerbatimArguments: !!options.windowsVerbatimArguments,
  });

  var encoding;
  const _stdout = []; // 输出的内容
  const _stderr = []; // 出错的内容
  // ...
  var stdoutLen = 0; // 输出内容的长度
  var stderrLen = 0; // 出错内容的长度
  var killed = false; // 是否已经被杀死
  var exited = false; // 是否已经退出
  var timeoutId; // 是否有定时器

  var ex = null; // 出错的上下文对象

  var cmd = file; // 命令或文件

  // 退出回调方法
  function exithandler(code, signal) {
    if (exited) return;
    exited = true;

    if (timeoutId) {
      clearTimeout(timeoutId);
      timeoutId = null;
    }

    if (!callback) return;

    // merge chunks
    var stdout;
    var stderr;
    if (encoding || (child.stdout && child.stdout.readableEncoding)) {
      stdout = _stdout.join("");
    } else {
      stdout = Buffer.concat(_stdout); // 如果不传入 encoding 参数,默认是Buffer拼接输出
    }
    if (encoding || (child.stderr && child.stderr.readableEncoding)) {
      stderr = _stderr.join("");
    } else {
      stderr = Buffer.concat(_stderr); // 如果不传入 encoding 参数,默认是Buffer拼接输出
    }

    if (!ex && code === 0 && signal === null) {
      callback(null, stdout, stderr); // 没有错误,执行回调
      return;
    }

    if (args.length !== 0) cmd += ` ${args.join(" ")}`;

    if (!ex) {
      // eslint-disable-next-line no-restricted-syntax
      ex = new Error("Command failed: " + cmd + "\n" + stderr);
      ex.killed = child.killed || killed;
      ex.code = code < 0 ? getSystemErrorName(code) : code;
      ex.signal = signal;
    }

    ex.cmd = cmd;
    callback(ex, stdout, stderr); // 有错误,执行回调
  }

  function errorhandler(e) {
    ex = e;

    // child.stdout 和 child.stderr 都销毁
    if (child.stdout) child.stdout.destroy();

    if (child.stderr) child.stderr.destroy();

    exithandler();
  }

  function kill() {
    if (child.stdout) child.stdout.destroy();

    if (child.stderr) child.stderr.destroy();

    killed = true;
    try {
      child.kill(options.killSignal);
    } catch (e) {
      ex = e;
      exithandler();
    }
  }

  // 如果定义了子进程的超时时间,就定时销毁它
  if (options.timeout > 0) {
    timeoutId = setTimeout(function delayedKill() {
      kill();
      timeoutId = null;
    }, options.timeout);
  }

  if (child.stdout) {
    if (encoding) child.stdout.setEncoding(encoding);

    child.stdout.on("data", function onChildStdout(chunk) {
      const encoding = child.stdout.readableEncoding;
      const length = encoding
        ? Buffer.byteLength(chunk, encoding)
        : chunk.length;
      stdoutLen += length; // 不断累加输出的长度

      if (stdoutLen > options.maxBuffer) {
        // 超过的话就报错
        const truncatedLen = options.maxBuffer - (stdoutLen - length);
        _stdout.push(chunk.slice(0, truncatedLen));

        ex = new ERR_CHILD_PROCESS_STDIO_MAXBUFFER("stdout");
        kill();
      } else {
        _stdout.push(chunk); // 不断累积chunk
      }
    });
  }

  // 整体逻辑和上面的child.stdout基本一致
  if (child.stderr) {
    if (encoding) child.stderr.setEncoding(encoding);

    child.stderr.on("data", function onChildStderr(chunk) {
      const encoding = child.stderr.readableEncoding;
      const length = encoding
        ? Buffer.byteLength(chunk, encoding)
        : chunk.length;
      stderrLen += length;

      if (stderrLen > options.maxBuffer) {
        const truncatedLen = options.maxBuffer - (stderrLen - length);
        _stderr.push(chunk.slice(0, truncatedLen));

        ex = new ERR_CHILD_PROCESS_STDIO_MAXBUFFER("stderr");
        kill();
      } else {
        _stderr.push(chunk);
      }
    });
  }

  child.addListener("close", exithandler);
  child.addListener("error", errorhandler);

  return child;
}

execexecFile最大的一个区别就是参数shell默认是否开启,其它基本都是相同的。另外,它们对输出的内容有大小限制是在child.stderr.on('data')child.stdout.on('data')获取数据时候被限制。

如果不做类似的规定,_stderr_stdout无限被输出,那么内存会不断的膨胀导致性能问题,甚至程序奔溃。(这是我猜的原因)

另外说到底,最后还是对spawn方法的封装了调用。

fork

// lib/child_process.js
function fork(modulePath /* , args, options */) {
  // Get options and args arguments.
  var execArgv;
  var options = {};
  var args = [];

  // ...

  // fork方法的stdio参数,必须带有一个ipc参数
  if (typeof options.stdio === "string") {
    options.stdio = stdioStringToArray(options.stdio, "ipc");
  } else if (!Array.isArray(options.stdio)) {
    // Use a separate fd=3 for the IPC channel. Inherit stdin, stdout,
    // and stderr from the parent if silent isn't set.
    options.stdio = stdioStringToArray(
      options.silent ? "pipe" : "inherit",
      "ipc"
    );
  } else if (!options.stdio.includes("ipc")) {
    throw new ERR_CHILD_PROCESS_IPC_REQUIRED("options.stdio");
  }

  options.execPath = options.execPath || process.execPath; // 默认用父进程的Node执行文件
  options.shell = false; // 不开启 shell

  return spawn(options.execPath, args, options);
}

从上面代码有个非常引人注意就是 12~22 行,fork 方法的 stdio 参数,必须带有一个 ipc 参数,这个ipc的作用将在后续深入挖掘后介绍。最后也是调用spawn创建子进程。

<br/>

spawn

// lib/child_process.js
function spawn(file, args, options) {
  const child = new ChildProcess();

  options = normalizeSpawnArguments(file, args, options);
  debug("spawn", options);
  child.spawn(options);

  return child;
}

lib/child_process.js里的 spawn 方法就简单的将传入的参数做整理,然后直接调用 ChildProcess 实例对象的 spawn。

ChildProcess

// `lib/internal/child_process.js`
function ChildProcess() {
  EventEmitter.call(this);

  // ...

  this._handle = new Process(); // new 出一个 ProcessWrap 对象 ( process_wrap.cc)
}

ChildProcess.prototype.spawn = function (options) {
  let i = 0;

  // 默认使用 pipe
  let stdio = options.stdio || "pipe";

  // 重置stdio变量,这个是个很重要的方法,后续将继续介绍
  stdio = getValidStdio(stdio, false);

  const ipc = stdio.ipc;
  const ipcFd = stdio.ipcFd;
  stdio = options.stdio = stdio.stdio;

  // ...

  // 调用ProcessWrap对应的spawn去创建新进程
  const err = this._handle.spawn(options);

  // 如果err存在,就会有相关代码处理创建子进程出错的场景
  // ...

  this.pid = this._handle.pid;

  for (i = 0; i < stdio.length; i++) {
    const stream = stdio[i];
    // ...
    if (stream.handle) {
      // When i === 0 - we're dealing with stdin
      // (which is the only one writable pipe).
      // 创建 socket
      stream.socket = createSocket(
        this.pid !== 0 ? stream.handle : null,
        i > 0
      );

      if (i > 0 && this.pid !== 0) {
        this._closesNeeded++;
        stream.socket.on("close", () => {
          maybeClose(this);
        });
      }
    }
  }
  this.stdin =
    stdio.length >= 1 && stdio[0].socket !== undefined ? stdio[0].socket : null;
  this.stdout =
    stdio.length >= 2 && stdio[1].socket !== undefined ? stdio[1].socket : null;
  this.stderr =
    stdio.length >= 3 && stdio[2].socket !== undefined ? stdio[2].socket : null;

  this.stdio = [];

  for (i = 0; i < stdio.length; i++)
    this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);

  // Add .send() method and start listening for IPC data
  // setupChannel 方法很长,主要就是实现了数据的发送和接受
  if (ipc !== undefined) setupChannel(this, ipc);

  return err;
};

function stdioStringToArray(stdio, channel) {
  // 主要将stdio参数格式化为[xxx,xxx,xxx]形式的数组
  // 如果有channel,[xxx,xxx,xxx,channel]
}

function getValidStdio(stdio, sync) {
  var ipc;
  var ipcFd;

  stdio = stdio.reduce((acc, stdio, i) => {
    function cleanup() {
      for (var i = 0; i < acc.length; i++) {
        if ((acc[i].type === "pipe" || acc[i].type === "ipc") && acc[i].handle)
          acc[i].handle.close();
      }
    }

    if (stdio === "ignore") {
      acc.push({ type: "ignore" }); // 子进程的输出不需要在控制台显示
    } else if (stdio === "pipe" || (typeof stdio === "number" && stdio < 0)) {
      var a = {
        type: "pipe",
        readable: i === 0, // 0是stdin,需要读
        writable: i !== 0, // 1是stdout,2是stderr 需要写
      };
      // spawn 调用的时候,sync为false,为stdin、stdout、stderr创建一个SOCKET类型Pipe
      if (!sync) a.handle = new Pipe(PipeConstants.SOCKET);

      acc.push(a);
    } else if (stdio === "ipc") {
      // 创建一个IPC类型Pipe
      ipc = new Pipe(PipeConstants.IPC);
      ipcFd = i; // ipc位置

      acc.push({
        type: "pipe",
        handle: ipc,
        ipc: true,
      });
    } else if (stdio === "inherit") {
      acc.push({
        type: "inherit",
        fd: i,
      });
    }
    // ...还有很多代码,不在讨论范围
    return acc;
  }, []);

  return { stdio, ipc, ipcFd };
}

ChildProcess并不能直接创建新的进程,需要底层 V8 的帮助,在构造函数里面直接 new ProcessWrap 赋给了 this._handle。

ChildProcess.prototype.spawn开始主要处理主要的stdio参数,明确父子进程通过哪些方式来获取数据信息,官方文档给出了一些示例,如果不清楚可以多做点实验。如果是pipe或者是ipc都会实例化一个 Pipe 对象,只是参数类型不同。

// pipe_wrap.cc
void PipeWrap::New(const FunctionCallbackInfo<Value>& args) {
  switch (type) {
    case SOCKET:
      provider = PROVIDER_PIPEWRAP;
      ipc = false;
      break;
    case IPC:
      provider = PROVIDER_PIPEWRAP;
      ipc = true;
      break;
  }
  new PipeWrap(env, args.This(), provider, ipc);
}
PipeWrap::PipeWrap(Environment* env,
                   Local<Object> object,
                   ProviderType provider,
                   bool ipc)
    : ConnectionWrap(env, object, provider) {
  int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
}
// deps/uv/src/unix/pipe.c
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
  uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
  handle->shutdown_req = NULL;
  handle->connect_req = NULL;
  handle->pipe_fname = NULL;
  handle->ipc = ipc;
  return 0;
}

它们唯一的区别就是uv_pipe_t的 ipc 参数是 true 还是 false,所有的事件都被老老实实的绑定在 libuv 上

回到ChildProcess.prototype.spawn中,已经重置了 stdio 参数后,到了真正创建子进程的地方了,this._handle.spawn(options);,通过process_wrap.cc里的 ProcessWrap.Spawn 去创建,整个创建方法也极长,主要是对传入的参数进行处理,然后再调用uv_spawn

// deps/uv/src/unix/process.c
int uv_spawn(uv_loop_t* loop,
             uv_process_t* process,
             const uv_process_options_t* options) {
#if defined(__APPLE__) && (TARGET_OS_TV || TARGET_OS_WATCH)
  /* fork is marked __WATCHOS_PROHIBITED __TVOS_PROHIBITED. */
  return UV_ENOSYS;
#else
  // ...

  uv_signal_start(&loop->child_watcher, uv__chld, SIGCHLD);

  /* Acquire write lock to prevent opening new fds in worker threads */
  uv_rwlock_wrlock(&loop->cloexec_lock);
  pid = fork();

  if (pid == -1) {
    err = UV__ERR(errno);
    uv_rwlock_wrunlock(&loop->cloexec_lock);
    uv__close(signal_pipe[0]);
    uv__close(signal_pipe[1]);
    goto error;
  }

  if (pid == 0) {
    uv__process_child_init(options, stdio_count, pipes, signal_pipe[1]);
    abort();
  }

  /* Release lock in parent process */
  uv_rwlock_wrunlock(&loop->cloexec_lock);
  uv__close(signal_pipe[1]);

  // ...
}

通过系统层面的fork函数创建子进程,由于 fork 的特殊性,一次调用返回二次,当返回 0 的时候回执行子进程的逻辑,回去通过uv__process_child_init初始化整个子进程的上下文信息。

再回到ChildProcess.prototype.spawn中,遍历stdio,如果成员有 handle 字段,就通过createSocket为其创建一个 socket 对象

function createSocket(pipe, readable) {
  return net.Socket({ handle: pipe, readable, writable: !readable });
}

无论是参数stdio是 pipe 还是 ipc 都会创建 socket,在父子进程通信的时候,父进程通过子进程暴露出来的 stdin、stdout 和 stderr 来展示子进程执行的信息,缺乏之间的数据互通性,这也是导致execexecFile可使用的场景有限,而fork会带一个 ipc 参数给 stdio 参数(可以回过去翻翻 fork 源码),所以可以执行父子进程的通信操作,比如 send 方法等,具体的实现可以看setupChannel

综上,Node.js 在child_process里创建进程的流程大致梳理了下,在 JavaScript 层面并没有什么复杂的,在 libuv 层面注册了很多相关的事件,有空可以研究研究。之后会写一篇关于 cluster.fork 的介绍,其实就是对 child_process.fork 更多的封装。

回到顶部