这篇文章主要从
net.createConnection
入手,详细讲解socket常用的connet、data、error、close等事件是如何实现的。
socket的创建
相信用过net.createConnection
都比较了解了。这个API会创建一个客户端的socket链接:
net.createConnection(options[, connectListener])
其中connectListener
将被添加为返回 socket 上的 ‘connect’ 事件上的监听器。
简单了解了API,我们直奔./lib/net.js来看一下是如何实现的:
// ./lib/net.js connect构造函数
function connect(...args) {
var normalized = normalizeArgs(args);
var options = normalized[0];
// ...
var socket = new Socket(options);
// ...
return Socket.prototype.connect.call(socket, normalized);
}
首先我们关注一下new Socket(options)
的实现:
const {
TCP,
TCPConnectWrap,
constants: TCPConstants
} = process.binding('tcp_wrap');
// ... socket构造函数
function Socket(options) {
// ...
this._hadError = false;
this._handle = null;
this._parent = null;
this._host = null;
this[kLastWriteQueueSize] = 0;
this[kTimeout] = null;
// ...
stream.Duplex.call(this, options);
this.allowHalfOpen = Boolean(allowHalfOpen);
if (options.handle) {
this._handle = options.handle; // private
this[async_id_symbol] = getNewAsyncId(this._handle);
} else if (options.fd !== undefined) {
const { fd } = options;
this._handle = createHandle(fd, false);
this._handle.open(fd);
this[async_id_symbol] = this._handle.getAsyncId();
// ...
}
initSocketHandle(this);
// ...
}
// ...
}
util.inherits(Socket, stream.Duplex);
// ... createHandle 函数
function createHandle(fd, is_server) {
const type = TTYWrap.guessHandleType(fd);
// ...
if (type === 'TCP') {
return new TCP(
is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
);
}
throw new ERR_INVALID_FD_TYPE(type);
}
通过上面这一段代码我们可以得到两个关键的信息:
this._handle = createHandle(fd, false);
是TCP
的实例stream.Duplex.call(this, options);
socket的实例继承了stream.Duplex
的属性和方法 接下来是connect
:
Socket.prototype.connect = function(...args) {
if (pipe) {
// ...
} else {
lookupAndConnect(this, options);
}
}
/// connect调用的核心函数
function internalConnect(
self, address, port, addressType, localAddress, localPort) {
if (addressType === 6 || addressType === 4) {
const req = new TCPConnectWrap();
req.oncomplete = afterConnect;
req.address = address;
req.port = port;
req.localAddress = localAddress;
req.localPort = localPort;
if (addressType === 4)
err = self._handle.connect(req, address, port);
// ...
} else {
// ...
}
}
里面需要注意的地方:
self._handle.connect(req, address, port);
这一段是真正的实现socket连接的地方req.oncomplete = afterConnect;
这一段是连接成功之后的回调
this._handle和’connect’事件
在上一节我们知道this._handle
是TCP
的实例,进而我们长驱直入tcp_wrap.cc:
void TCPWrap::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context) {
// ...
Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
Local<String> tcpString = FIXED_ONE_BYTE_STRING(env->isolate(), "TCP");
t->SetClassName(tcpString);
t->InstanceTemplate()->SetInternalFieldCount(1);
t->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "reading"),
Boolean::New(env->isolate(), false));
t->InstanceTemplate()->Set(env->owner_string(), Null(env->isolate()));
t->InstanceTemplate()->Set(env->onread_string(), Null(env->isolate()));
t->InstanceTemplate()->Set(env->onconnection_string(), Null(env->isolate()));
// ...
env->SetProtoMethod(t, "connect", Connect);
// ...
}
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
// ...
Environment* env = Environment::GetCurrent(args);
int type_value = args[0].As<Int32>()->Value();
TCPWrap::SocketType type = static_cast<TCPWrap::SocketType>(type_value);
ProviderType provider;
switch (type) {
case SOCKET:
provider = PROVIDER_TCPWRAP;
break;
case SERVER:
provider = PROVIDER_TCPSERVERWRAP;
break;
default:
UNREACHABLE();
}
new TCPWrap(env, args.This(), provider);
}
TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
: ConnectionWrap(env, object, provider) {
int r = uv_tcp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_tcp_init() returns void.
}
通过上面的代码,我们可以清晰地看到TCPWrap
向js层抛出的构造函数TCP
,重点注意下
env->SetProtoMethod(t, “connect”, Connect);
这里对TCP
的实例添加了connect
方法,意即this._handle.connect
调用的为TCPWrap::Connect
:
void TCPWrap::Connect(const FunctionCallbackInfo<Value>& args) {
// ...
if (err == 0) {
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
ConnectWrap* req_wrap =
new ConnectWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_TCPCONNECTWRAP);
err = req_wrap->Dispatch(uv_tcp_connect,
&wrap->handle_,
reinterpret_cast<const sockaddr*>(&addr),
AfterConnect);
// ...
}
args.GetReturnValue().Set(err);
}
通过这段代码可以清晰地看到,uv_tcp_connect
的cb为AfterConnect
,libuv中的逻辑就不做详细介绍了,有兴趣的可以观光一下tcp.c的uv__tcp_connect
函数,最终会走入到uv__io_poll
的轮回中。我们重点看一下AfterConnect
,代码在connection_wrap.cc中:
void ConnectionWrap<WrapType, UVType>::AfterConnect(uv_connect_t* req,
int status) {
// ...
req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
delete req_wrap;
}
可以看到最终调用的函数为oncomplete
,这时又回到了net.js
中:
const req = new TCPConnectWrap();
req.oncomplete = afterConnect;
而afterConnect
的定义中有这样一句:
function afterConnect(status, handle, req, readable, writable) {
// ...
if (status === 0) {
// ...
self.emit('connect');
self.emit('ready');
}
}
最终在这里emit出了connect
方法,即连接成功的回调,也就是net.createConnection(options[, connectListener])
中connectListener
函数最终被emit触发的地方。
stream和’data’事件
不知道读者还有没有记得,文中的第一节曾分析过:
stream.Duplex.call(this, options);
socket的实例继承了stream.Duplex
的属性和方法
没错,'data’事件就是在stream.Duplex
中emit的,在上一篇文章中我曾介绍过stream的写入过程,而在这里值stream的读取过程,刚才在分析TCPWrap的时候,在c++中构造js的TCP构造函数时候有这样一句:
t->InstanceTemplate()->Set(env->onread_string(), Null(env->isolate()));
由于socket继承了stream.Duplex
,整个的溯源过程就不详细查找了,最终onread
的触发在stream_base.cc中:
void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
Local<Value> argv[] = {
Integer::New(env->isolate(), nread),
buf
};
// ...
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
}
而在js中的触发则在net.js
:
function onread(nread, buffer) {
// ...
if (nread > 0) {
// ...
var ret = self.push(buffer);
// ...
}
}
push方法则是从stream继承来的,视线转移到_stream_readable.js中:
Readable.prototype.push = function(chunk, encoding) {
// ...
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
}
readableAddChunk
函数的作用是对流不断进行拼接并在过程中进行容错处理:
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
// ...
if (addToFront) {
if (state.endEmitted)
stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {
state.reading = false;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
addChunk(stream, state, chunk, false);
else
maybeReadMore(stream, state);
} else {
addChunk(stream, state, chunk, false);
}
}
在这里我只截取了关键代码,**可以看到在addChunk
的同时,如果出错会立刻emit出’error’事件。在上游net.js
中还有一些情况会emit 'error’事件,就不做详尽分析了。**接下来我们看下addChunk
:
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
state.awaitDrain = 0;
stream.emit('data', chunk);
} else {
// update the buffer info.
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront)
state.buffer.unshift(chunk);
else
state.buffer.push(chunk);
if (state.needReadable)
emitReadable(stream);
}
maybeReadMore(stream, state);
}
通过stream.emit('data', chunk);
会emit出’data’事件,并且把chunk传入到参数中。
总结
至此,socket的’connect’、‘data’、'error’事件便分析完毕了。有意思的是,这三个事件的来源不尽相同:
- 'connect’是通过
TCPWrap
类及一系列函数触发的emit - 'data’则是和
StreamBase
类相关 - 'error’则相当于全程的兜底,不管在哪里出了问题,总会emit出’error’事件。
by 小菜
原文地址:https://github.com/xtx1130/blog/issues/26 欢迎watch和star。如果文中有表述错误的地方还请大神斧正
加油