这篇文章主要从
fs.write
入手,简单讲述node中写文件同步与异步的实现以及详细解释异步I/O的回调如何通过AsyncWrap
串起来nextTick
和MicroTasks
昨天有个朋友问我:
读源码什么都不懂,无从入手该怎么办?
我感觉透过现象来看本质是一个很好的入手方向。这篇文章就从我们熟悉的fs.write
和fs.writeSync
入手,透过这些简单的API,来看看node究竟在里面做了什么。
fs.write和fs.writeSync
相信有一些node基础的开发者或者之前读过我的文章的读者都会知道console.log
是基于process.stdout.write
实现的,意即console.log
是异步操作(可能有人会提出疑问:既然是异步,如何保证输出是正确的?请自己移步./lib/console.js查看)。所以如果我们想要调试node源码异步回调的时候,如果使用console.log
会造成递归,这种情况下一般都会使用fs.writeSync
。在js层面的源码中,fs.write
和fs.writeSync
在调用的时候其实只差了一个参数:
// ./lib/fs.js
fs.write = function(fd, buffer, offset, length, position, callback) {
function wrapper(err, written) {
// Retain a reference to buffer so that it can't be GC'ed too soon.
callback(err, written || 0, buffer);
}
// ...
const req = new FSReqWrap();
req.oncomplete = wrapper;
if (isUint8Array(buffer)) {
// ...
return binding.writeBuffer(fd, buffer, offset, length, position, req); //注意这里
}
// ...
return binding.writeString(fd, buffer, offset, length, req); //注意这里
};
fs.writeSync = function(fd, buffer, offset, length, position) {
validateUint32(fd, 'fd');
const ctx = {};
let result;
if (isUint8Array(buffer)) {
// ...
result = binding.writeBuffer(fd, buffer, offset, length, position,
undefined, ctx); //注意这里
} else {
// ...
result = binding.writeString(fd, buffer, offset, length,
undefined, ctx); //注意这里
}
handleErrorFromBinding(ctx);
return result;
};
通过对比可以发现,在调用writeBuffer
或者writeString
的时候,fs.write
多了一个req的参数,而fs.writeSync
拥有ctx参数。接下来我们去node_file.cc看看这两个到底区别在哪里。在这里我们以writeBuffer
为例:
static void WriteString(const FunctionCallbackInfo<Value>& args) {
// ...
FSReqBase* req_wrap = GetReqWrap(env, args[4]);
const bool is_async = req_wrap != nullptr;
// ...
if (is_async) { // write(fd, string, pos, enc, req)
CHECK_NE(req_wrap, nullptr);
len = StringBytes::StorageSize(env->isolate(), value, enc);
FSReqBase::FSReqBuffer& stack_buffer =
req_wrap->Init("write", len, enc);
// ...
int err = uv_fs_write(env->event_loop(), req_wrap->req(),
fd, &uvbuf, 1, pos, AfterInteger); //注意这里
req_wrap->Dispatched();
if (err < 0) {
// ...
} else {
req_wrap->SetReturnValue(args);
}
} else {
CHECK_EQ(argc, 6);
fs_req_wrap req_wrap;
// ...
uv_buf_t uvbuf = uv_buf_init(buf, len);
int bytesWritten = SyncCall(env, args[5], &req_wrap, "write",
uv_fs_write, fd, &uvbuf, 1, pos); //注意这里
args.GetReturnValue().Set(bytesWritten);
}
}
在这里我们可以很明显的开出来其中用一个if...else
把同步和异步的逻辑区分开了。async直接调用uv_fs_write
而sync则调用SyncCall()
。当时我看到这里的时候还有些许错愕,因为node_file.cc
中提供了AsyncCall()
方法,单独这个API没有使用,于是我翻到了这个pr,是为了防止内存泄漏,所以才进行的单独的处理,具体信息可以去pr中了解。
对比一下uv_fs_write
和下面的SyncCall
,可以发现uv_fs_write
多出了AfterInteger
这个参数,AfterInteger
定义如下:
void AfterInteger(uv_fs_t* req) {
FSReqBase* req_wrap = static_cast<FSReqBase*>(req->data);
FSReqAfterScope after(req_wrap, req);
if (after.Proceed())
req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
}
其中after.Processd()
定义如下:
bool FSReqAfterScope::Proceed() {
if (req_->result < 0) {
Reject(req_);
return false;
}
return true;
}
根据函数中的if判断以及req_wrap->Resolve
很明显可以看出来这是一个回调函数,经过比对可以发现uv_fs_write
多出的AfterInteger
其实是一个函数。req_wrap
会在下面进行介绍,我们现在重点先关注一下uv_fs_write
。
uv_fs_write
视线转移到libuv中的fs.c文件中,其中定义了uv_fs_write
,这里只摘抄重点的部分进行解读:
#define INIT(subtype) \
do { \
if (req == NULL) \
return UV_EINVAL; \
UV_REQ_INIT(req, UV_FS); \
req->fs_type = UV_FS_ ## subtype; \
req->result = 0; \
req->ptr = NULL; \
req->loop = loop; \
req->path = NULL; \
req->new_path = NULL; \
req->bufs = NULL; \
req->cb = cb; \
} \
while (0)
#define POST \
do { \
if (cb != NULL) { \
uv__req_register(loop, req); \
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
return 0; \
} \
else { \
uv__fs_work(&req->work_req); \
return req->result; \
} \
} \
while (0)
int uv_fs_write(uv_loop_t* loop,
uv_fs_t* req,
uv_file file,
const uv_buf_t bufs[],
unsigned int nbufs,
int64_t off,
uv_fs_cb cb) {
INIT(WRITE);
// ...
POST;
}
根据定义可以发现,上文中的AfterInteger
其实是作为cb参数传入到了uv_fs_write
中。接下来注意一下POST
宏:
if (cb != NULL) { \
uv__req_register(loop, req); \
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
return 0; \
} \
else { \
uv__fs_work(&req->work_req); \
return req->result; \
}
在这里对cb进行了判断,如果无cb则直接调用uv__fs_work
,如果有cb则会把uv__fs_work
放到thread_pool中调用以形成异步I/O。libuv通过这种方式,实现了同步和异步。
异步的回调和同步的返回
同步fs.writeSync
的返回
首先我们先关注一下同步的fs.writeSync
的返回,视线回到node_file.cc中,关于同步的返回在这里:
int bytesWritten = SyncCall(env, args[5], &req_wrap, "write",
uv_fs_write, fd, &uvbuf, 1, pos);
args.GetReturnValue().Set(bytesWritten);
可以很容易的看到,fs.writeSync
的返回为写入的字节数。
异步fs.write
的回调
在刚才已经介绍过fs.write
会把AfterInteger
作为cb传入到uv_fs_write
中,在事件循环开始之后会在poll阶段执行AfterInteger
回调。而AfterIntenger
中最终执行的是:
req_wrap->Resolve(Integer::New(req_wrap->env()->isolate(), req->result));
Resolve代码如下:
void FSReqWrap::Resolve(Local<Value> value) {
Local<Value> argv[2] {
Null(env()->isolate()),
value
};
MakeCallback(env()->oncomplete_string(), arraysize(argv), argv);
}
可以看到最终执行的是MakeCallback
,如果读过我之前文章的读者,很容易联想到之前timers API的MakeCallback
。这两个MakeCallback
其实还是有区别的,区别就在于–FSReqWrap
继承自AsyncWrap
,而这个MakeCallback
的声明在async_wrap-inl.h:
inline v8::MaybeLocal<v8::Value> AsyncWrap::MakeCallback(
const v8::Local<v8::String> symbol,
int argc,
v8::Local<v8::Value>* argv) {
v8::Local<v8::Value> cb_v = object()->Get(symbol);
CHECK(cb_v->IsFunction());
return MakeCallback(cb_v.As<v8::Function>(), argc, argv);
}
最终调用的return MakeCallback(cb_v.As<v8::Function>(), argc, argv);
则在async_wrap.cc中:
MaybeLocal<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
int argc,
Local<Value>* argv) {
EmitTraceEventBefore();
async_context context { get_async_id(), get_trigger_async_id() };
MaybeLocal<Value> ret = InternalMakeCallback(
env(), object(), cb, argc, argv, context);
EmitTraceEventAfter();
return ret;
}
InternalMakeCallback
不知道大家还有没有印象,之前的文章中曾经介绍过,在node源码粗读(9):nextTick、timers API、MicroTasks注册到执行全阶段解读的event-loop阶段
章节,有这样一句话:
也正是
InternalMakeCallback
和InternalCallbackScope::Close
使得libuv和v8紧紧的联系在了一起
没错,AsyncWrap::MakeCallback
最终调用的还是node::InternalMakeCallback
。殊途同归,最终使得整体形成了一个闭环(ps: InternalCallbackScope::Close
会继续调用nextTick以及RunMicrotasks)。
by 小菜
原文地址:https://github.com/xtx1130/blog/issues/23,欢迎star和watch,如果有疑问可以在文章下面或者issue中留言。如果文中描述的有错误的地方,还请大神斧正。