这篇文章从
console.log
入手,对流的写入进行层层剖析。
console.log浅析
console.log
的实现浅析很容易,直接转到console.js中:
// ./lib/console.js
// ...
Console.prototype.log = function log(...args) {
write(this._ignoreErrors,
this._stdout,
this[kFormatForStdout](args),
this._stdoutErrorHandler,
this[kGroupIndent]);
};
其中write
函数定义如下:
function write(ignoreErrors, stream, string, errorhandler, groupIndent) {
// ...
try {
// Add and later remove a noop error handler to catch synchronous errors.
stream.once('error', noop);
stream.write(string, errorhandler);
} catch (e) {
// console is a debugging utility, so it swallowing errors is not desirable
// even in edge cases such as low stack space.
if (isStackOverflowError(e))
throw e;
// Sorry, there's no proper way to pass along the error here.
} finally {
stream.removeListener('error', noop);
}
}
而stream
则是在创建实例的时候传递进去的:
module.exports = new Console({
stdout: process.stdout,
stderr: process.stderr
});
由此可见,console.log
实质是process.stdout.write
。
process.stdout的实现
接下来,我们直接溯源到process.stdout
,相关源码在stdio.js中:
// ./lib/internal/process/stdio.js
// ...
function getStdout() {
if (stdout) return stdout;
stdout = createWritableStdioStream(1);
stdout.destroySoon = stdout.destroy;
stdout._destroy = function(er, cb) {
// Avoid errors if we already emitted
er = er || new ERR_STDOUT_CLOSE();
cb(er);
};
if (stdout.isTTY) {
process.on('SIGWINCH', () => stdout._refreshSize());
}
return stdout;
}
// ...
Object.defineProperty(process, 'stdout', {
configurable: true,
enumerable: true,
get: getStdout
});
// ...
function createWritableStdioStream(fd) {
var stream;
const tty_wrap = process.binding('tty_wrap');
// Note stream._type is used for test-module-load-list.js
switch (tty_wrap.guessHandleType(fd)) {
case 'TTY':
var tty = require('tty');
stream = new tty.WriteStream(fd);
stream._type = 'tty';
break;
case 'FILE':
var fs = require('internal/fs');
stream = new fs.SyncWriteStream(fd, { autoClose: false });
stream._type = 'fs';
break;
case 'PIPE':
case 'TCP':
var net = require('net');
stream = new net.Socket({
fd: fd,
readable: false,
writable: true
});
stream._type = 'pipe';
break;
default:
// Probably an error on in uv_guess_handle()
throw new ERR_UNKNOWN_STREAM_TYPE();
}
由此可见,process.stdout
本质上是getStdout
,而getStdout
调用了createWritableStdioStream
方法来创建的stdout
,最终由tty.WriteStream
创建了一个stream
并返回给getStdout
。
net.socket介绍
调用栈就不再一一溯源了,在这里简单介绍一下。
tty中,会调用:
// ./lib/tty.js
net.Socket.call(this, {
handle: tty,
readable: false,
writable: true
});
inherits(WriteStream, net.Socket);
进而在net.js中通过:
stream.Duplex.call(this, options);
最终生成了一个Duplex流(可读写流,网上资料很多不做过多介绍)。而Duplex流的write操作则来自_stream_writable.js:
// ./lib/_stream_duplex.js
function Duplex(options) {
// ...
Writable.call(this, options);
//...
if (options && options.readable === false)
this.readable = false;
if (options && options.writable === false)
this.writable = false;
}
至此,可以列出基本的观光路线了:
console.js -> stdio.js ->tty.js -> net.js -> _stream_duplex.js -> _stream_writable.js
接下来才真正开始进行源码的解读。
process.stdout.write在js层对数据的写入
我们重点放到_stream_writable.js
中,在创建Writable
类的时候会进行如下操作:
// ./lib/_stream_writable.js
function Writable(options) {
// ...
if (options) {
if (typeof options.write === 'function')
this._write = options.write;
if (typeof options.writev === 'function')
this._writev = options.writev;
if (typeof options.destroy === 'function')
this._destroy = options.destroy;
if (typeof options.final === 'function')
this._final = options.final;
}
// ...
}
重点注意一下this._write = options.write;
在这里我就不详细溯源了,其实options.write
来自net.js
中的:
// ../lib/net.js
Socket.prototype._write = function(data, encoding, cb) {
// ...
var req = createWriteWrap(this._handle, afterWrite);
if (writev)
writevGeneric(this, req, data, cb);
else
writeGeneric(this, req, data, encoding, cb);
if (req.async)
this[kLastWriteQueueSize] = req.bytes;
};
writev
是处理多数据流块用的,在这里我们只关注writeGeneric(this, req, data, encoding, cb);
。
createWriteWrap
以及writeGeneric
来自模块stream_base_commons.js:
// ./lib/internal/stream_base_commons.js
const { WriteWrap } = process.binding('stream_wrap');
function createWriteWrap(handle, oncomplete) {
var req = new WriteWrap();
req.handle = handle;
req.oncomplete = oncomplete;
req.async = false;
return req;
}
function writeGeneric(self, req, data, encoding, cb) {
var err = handleWriteReq(req, data, encoding);
afterWriteDispatched(self, req, err, cb);
}
代码只有简单的两行,却是最终console.log
能最终打印到控制台的关键代码。
stream.write的底层实现
我们先详细解读一下WriteWrap
,WriteWrap
来自内部stream_wrap
,视线直接转移到stream_wrap.cc中,其中有对WriteWrap
的定义:
void LibuvStreamWrap::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context) {
// ...
Local<FunctionTemplate> ww =
FunctionTemplate::New(env->isolate(), is_construct_call_callback);
ww->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1);
Local<String> writeWrapString =
FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
ww->SetClassName(writeWrapString);
AsyncWrap::AddWrapMethods(env, ww);
target->Set(writeWrapString, ww->GetFunction());
env->set_write_wrap_template(ww->InstanceTemplate());
// ...
}
在这里稍微注意下AsyncWrap::AddWrapMethods
,这是AsyncWrap
专门针对异步操作增加getAsyncId
方法用的API。我们转到stream_wrap
的头文件stream_wrap.h中看看LibuvStreamWrap
是如何定义的:
// ./src/stream_wrap.h
class LibuvStreamWrap : public HandleWrap, public StreamBase {
public:
// ...
int GetFD() override;
bool IsAlive() override;
bool IsClosing() override;
bool IsIPCPipe() override;
// JavaScript functions
int ReadStart() override;
int ReadStop() override;
// Resource implementation
int DoShutdown(ShutdownWrap* req_wrap) override;
int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
// ...
ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object) override;
WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object) override;
}
LibuvStreamWrap
继承于HandleWrap
和StreamBase
,这样就很好理解了。HandleWrap
包裹的是libuv handle,而StreamBase
则完全是关于stream的操作,StreamBase
提供基本的读写能力的同时,开放出来一些方法让子类去覆写,实现定制化的需求,例如DoTryWrite
是StreamBase
中在写入流的时候调用的方法,而这个方法是专门提供出来让子类去覆写的。LibuvStreamWrap
在构建的时候,通过覆写一些方法,实现对流定制化的操作。
stream.write的写入过程
接下来我们看一下流的写入过程,js方面调用的API就不过多介绍了,通过js可以定位到stream_base.cc中的函数:
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
//..
if (try_write) {
data_size = StringBytes::Write(env->isolate(),
stack_storage,
storage_size,
string,
enc);
buf = uv_buf_init(stack_storage, data_size);
uv_buf_t* bufs = &buf;
size_t count = 1;
err = DoTryWrite(&bufs, &count);
synchronously_written = count == 0 ? data_size : data_size - buf.len;
bytes_written_ += synchronously_written;
// ...
}
}
//..
}
很容易注意到,真正的写入过程是刚才所提到的DoTryWrite
,我们跟进一下DoTryWrite
,由于这个函数被覆写了,所以我们直接去stream_wrap.cc:
int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
int err;
size_t written;
uv_buf_t* vbufs = *bufs;
size_t vcount = *count;
err = uv_try_write(stream(), vbufs, vcount);
if (err == UV_ENOSYS || err == UV_EAGAIN)
return 0;
if (err < 0)
return err;
written = err;
for (; vcount > 0; vbufs++, vcount--) {
// Slice
if (vbufs[0].len > written) {
vbufs[0].base += written;
vbufs[0].len -= written;
written = 0;
break;
// Discard
} else {
written -= vbufs[0].len;
}
}
*bufs = vbufs;
*count = vcount;
return 0;
}
可以看到,其实最终写入调用的函数是libuv的uv_try_write
,uv_try_write
本质和uv_write
是一样的,只不过uv_try_write
会预先判断这个流能不能立即完成,如果不能立即完成,则不会将写入请求排入queue中。可以简单看一下uv_try_write
代码,在stream.c中:
int uv_try_write(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs) {
int r;
int has_pollout;
size_t written;
size_t req_size;
uv_write_t req;
/* Connecting or already writing some data */
if (stream->connect_req != NULL || stream->write_queue_size != 0)
return UV_EAGAIN;
has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
if (r != 0)
return r;
// ...
}
可以看到最终,还是使用r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
即uv_write
实现的流的写入操作。
console.log写入之后的回调
流的写入流程已经分析完了,接下来看一下写入之后做了什么。视线回到_stream_writable.js中:
function onwrite(stream, er) {
var state = stream._writableState;
var sync = state.sync;
var cb = state.writecb;
// ...
if (sync) {
process.nextTick(afterWrite, stream, state, finished, cb);
} else {
afterWrite(stream, state, finished, cb);
}
}
}
js的代码我就不一一溯源了,onwrite
函数可以从Writable.prototype.write
中追溯过来,由于state.sync
为true
所以会走入到process.nextTick(afterWrite, stream, state, finished, cb);
这块的逻辑。而nextTick
在创建TickObject
时,会有如下逻辑:
// ./lib/internal/process/next_tick.js
class TickObject {
constructor(callback, args, triggerAsyncId) {
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this.callback = null;
this.callback = callback;
this.args = args;
const asyncId = newAsyncId();
this[async_id_symbol] = asyncId;
this[trigger_async_id_symbol] = triggerAsyncId;
if (initHooksExist()) {
emitInit(asyncId,
'TickObject',
triggerAsyncId,
this);
}
}
}
nextTick
在创建之初,便被AsyncWrap
包裹,并且作为异步事件的’TickObject’资源类型。所以在任何涉及到AsyncHooks回调中调用console.log
,会直接导致AsyncHooks无限递归调用。
nextTick
说完了,那么大家可能还会好奇,console.log
通过nextTick
执行了什么回调,简单的从上面代码来看是afterWrite
,afterWrite
代码如下:
// ./lib/_stream_writable.js
function afterWrite(stream, state, finished, cb) {
if (!finished)
onwriteDrain(stream, state);
state.pendingcb--;
cb();
finishMaybe(stream, state);
}
可以看到最终调用的回调函数是cb,这个cb在哪里进行定义的呢?经过溯源,这个cb的定义出现在console.js
中:
var prop = {
writable: true,
enumerable: false,
configurable: true
};
prop.value = createWriteErrorHandler(stdout);
Object.defineProperty(this, '_stdoutErrorHandler', prop);
function createWriteErrorHandler(stream) {
return (err) => {
if (err !== null && !stream._writableState.errorEmitted) {
if (stream.listenerCount('error') === 0) {
stream.on('error', noop);
}
}
};
}
createWriteErrorHandler()
是这个cb的真面目。代码意图也很明显,意即有错误的时候才会触发其中的逻辑,并且如果在错误没有被监听的情况下,增加错误监听函数noop
(空函数)。
结语
至此,整个console.log
的过程分析完毕,由于其中涉及到的部分很多,和整体逻辑关系不大的部分,例如tty_wrap.cc
等没有逐步分析,而且js的调用栈由于篇幅关系,也没有一一分析。本篇文章可以帮助读者把console.log
和stream
以及AsyncHook
串联起来,其中省略一些溯源步骤还望读者见谅。
by 小菜
原文地址:https://github.com/xtx1130/blog/issues/24,欢迎watch和star,如果文中有讲解错误的地方,还请大神斧正。
可以回复我一下么