之前文章讲到nodejs创建一个服务并且建立tcp连接的过程。接下来分析一下,在建立tcp连接后,nodejs是如何解析http协议的。我们首先看一下nodejs在建立tcp连接时执行net.js层的回调时做了什么操作。下面是核心代码。
// clientHandle代表一个和客户端建立tcp连接的实体
function onconnection(err, clientHandle) {
var handle = this;
var self = handle.owner;
debug('onconnection');
if (err) {
self.emit('error', errnoException(err, 'accept'));
return;
}
// 建立过多,关掉
if (self.maxConnections && self._connections >= self.maxConnections) {
clientHandle.close();
return;
}
var socket = new Socket({
handle: clientHandle,
allowHalfOpen: self.allowHalfOpen,
pauseOnCreate: self.pauseOnConnect
});
socket.readable = socket.writable = true;
}
...
建立tcp连接后nodejs新建了一个Socket实体。我们看一下new Socket的核心逻辑。
stream.Duplex.call(this, options);
this._handle = options.handle;
initSocketHandle(this);
// 触发底层注册一些函数
this.read(0);
function initSocketHandle(self) {
if (self._handle) {
self._handle.owner = self;
// 这个函数在底层有数据时会回调
self._handle.onread = onread;
self[async_id_symbol] = getNewAsyncId(self._handle);
}
}
另一个重点是read(0)这个函数的逻辑。
Socket.prototype.read = function(n) {
if (n === 0)
return stream.Readable.prototype.read.call(this, n);
this.read = stream.Readable.prototype.read;
this._consuming = true;
return this.read(n);
};
read函数最终会调用_read函数读数据。我们看一下ReadableStream里的read。
在read里会执行_read
this._read(state.highWaterMark);
而_read是由Socket函数实现的。因为Socket继承了ReadableStream。_read执行了一个很重要的操作。
this._handle.readStart();
_handle代表的是一个TCP对象,即tcp_wrapper.cc里创建的。所以我们去看tcp_wrapper的代码。但是没找到该函数。原来该函数在tcp_wrapper的子类stream_wrap里实现的。
int LibuvStreamWrap::ReadStart() {
return uv_read_start(stream(), [](uv_handle_t* handle,
size_t suggested_size,
uv_buf_t* buf) {
static_cast<LibuvStreamWrap*>(handle->data)->OnUvAlloc(suggested_size, buf);
}, [](uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
static_cast<LibuvStreamWrap*>(stream->data)->OnUvRead(nread, buf);
});
}
其实就是调用了libuv的uv_read_start函数。该函数在stream.c里。我们继续往下看。
stream->read_cb = read_cb;
stream->alloc_cb = alloc_cb;
// 注册读事件
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
主要是注册了事件和回调函数,该函数会在数据到来时被执行。到此,就告一段落了。现在就要等数据的到来。上篇文章我们分析过,数据到来时执行的函数是uv__stream_io。
if (events & (POLLIN | POLLERR | POLLHUP))
uv__read(stream)
有读事件到来的时候,uv__stream_io会调uv_read函数。
buf = uv_buf_init(NULL, 0);
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
if (buf.base == NULL || buf.len == 0) {
/* User indicates it can't or won't handle the read. */
stream->read_cb(stream, UV_ENOBUFS, &buf);
return;
}
这两个函数就是刚才注册的。我们再次回到nodejs的c++代码。看一下这两个函数做了什么。
void LibuvStreamWrap::OnUvRead(...) {
EmitRead(nread, *buf);
}
EmitRead在stream_base-inl.h里定义,他又是一个子类。
inline void StreamResource::EmitRead(ssize_t nread, const uv_buf_t& buf) {
if (nread > 0)
bytes_read_ += static_cast<uint64_t>(nread);
listener_->OnStreamRead(nread, buf);
}
在stream_base.c定义
OnStreamRead() {
stream->CallJSOnreadMethod(nread, obj);
}
CallJSOnreadMethod() {
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
}
在env.h里我们知道onread_string就是onread,所以这里就是执行js层的onread函数。该函数就是在new Socket的时候注册的。我们回到js的代码。
function onread() {
var ret = self.push(buffer);
}
push函数是在readableStream里定义的。他经过一系列处理触发ondata事件。
function addChunk(...) {
...
stream.emit('data', chunk);
...
}
那是谁监听了ondata事件呢,我们首先看一下nodejs在建立一个连接到再_http_server.js层做了什么处理。
function Server(requestListener) {
if (!(this instanceof Server)) return new Server(requestListener);
net.Server.call(this, { allowHalfOpen: true });
// 收到http请求时执行的回调
if (requestListener) {
this.on('request', requestListener);
}
this.httpAllowHalfOpen = false;
// 建立tcp连接的回调
this.on('connection', connectionListener);
this.timeout = 2 * 60 * 1000;
this.keepAliveTimeout = 5000;
this._pendingResponseData = 0;
this.maxHeadersCount = null;
}
connectionListener代码如下。
function connectionListener(socket) {
defaultTriggerAsyncIdScope(
getOrSetAsyncId(socket), connectionListenerInternal, this, socket
);
}
function connectionListenerInternal(server, socket) {
httpSocketSetup(socket);
if (socket.server === null)
socket.server = server;
if (server.timeout && typeof socket.setTimeout === 'function')
socket.setTimeout(server.timeout);
socket.on('timeout', socketOnTimeout);
var parser = parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST);
parser.socket = socket;
socket.parser = parser;
parser.incoming = null;
// Propagate headers limit from server instance to parser
if (typeof server.maxHeadersCount === 'number') {
parser.maxHeaderPairs = server.maxHeadersCount << 1;
} else {
// Set default value because parser may be reused from FreeList
parser.maxHeaderPairs = 2000;
}
var state = {
onData: null,
onEnd: null,
onClose: null,
onDrain: null,
outgoing: [],
incoming: [],
outgoingData: 0,
keepAliveTimeoutSet: false
};
// 收到tcp连接中的数据时回调
state.onData = socketOnData.bind(undefined, server, socket, parser, state);
state.onEnd = socketOnEnd.bind(undefined, server, socket, parser, state);
state.onClose = socketOnClose.bind(undefined, socket, state);
state.onDrain = socketOnDrain.bind(undefined, socket, state);
socket.on('data', state.onData);
socket.on('error', socketOnError);
socket.on('end', state.onEnd);
socket.on('close', state.onClose);
socket.on('drain', state.onDrain);
parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state);
// We are consuming socket, so it won't get any actual data
socket.on('resume', onSocketResume);
socket.on('pause', onSocketPause);
// Override on to unconsume on `data`, `readable` listeners
socket.on = socketOnWrap;
// We only consume the socket if it has never been consumed before.
if (socket._handle) {
var external = socket._handle._externalStream;
if (!socket._handle._consumed && external) {
parser._consumed = true;
socket._handle._consumed = true;
parser.consume(external);
}
}
parser[kOnExecute] =
onParserExecute.bind(undefined, server, socket, parser, state);
socket._paused = false;
}
主要是注册了一系列的回调函数,这些函数在收到数据或者解析数据时会被执行。所以收到数据后执行的函数是socketOnData。该函数就是把数据传进http解析器然后进行解析。
function socketOnData(server, socket, parser, state, d) {
...
var ret = parser.execute(d);
onParserExecuteCommon(server, socket, parser, state, ret, d);
}
我们先看一下parser是个什么。parser是在_http_server.js的onconnection回调里,parsers.alloc()分配的。而parsers又是个啥呢?他在_http_common.js里定义。
var parsers = new FreeList('parsers', 1000, function() {
var parser = new HTTPParser(HTTPParser.REQUEST);
parser._headers = [];
parser._url = '';
parser._consumed = false;
parser.socket = null;
parser.incoming = null;
parser.outgoing = null;
// Only called in the slow case where slow means
// that the request headers were either fragmented
// across multiple TCP packets or too large to be
// processed in a single run. This method is also
// called to process trailing HTTP headers.
parser[kOnHeaders] = parserOnHeaders;
parser[kOnHeadersComplete] = parserOnHeadersComplete;
parser[kOnBody] = parserOnBody;
parser[kOnMessageComplete] = parserOnMessageComplete;
parser[kOnExecute] = null;
return parser;
});
class FreeList {
constructor(name, max, ctor) {
this.name = name;
this.ctor = ctor;
this.max = max;
this.list = [];
}
alloc() {
return this.list.length ?
this.list.pop() :
this.ctor.apply(this, arguments);
}
free(obj) {
if (this.list.length < this.max) {
this.list.push(obj);
return true;
}
return false;
}
}
他其实是管理http解析器的。重点是HTTPParser,他定义在node_http_parser.cc是对http解析器的封装。真正的解析器在http_parser.c。回到刚才的地方。nodejs收到数据后执行 parser.execute(d);execute函数对应的是node_http_parser里的Execute。该函数进行了重载。入口是下面这个函数。
static void Execute(const FunctionCallbackInfo<Value>& args) {
Local<Value> ret = parser->Execute(buffer_data, buffer_len);
}
Local<Value> Execute(char* data, size_t len) {
http_parser_execute(&parser_, &settings, data, len);
}
http_parser_execute函数定义在http_parser.c,该函数就是进行真正的http协议解析。它里面会有一些钩子函数。在解析的某个阶段会执行。例如解析完头部。
if (settings->on_headers_complete) {
switch (settings->on_headers_complete(parser)) {
...
}
}
具体的定义在node_http_parser.cc
const struct http_parser_settings Parser::settings = {
Proxy<Call, &Parser::on_message_begin>::Raw,
Proxy<DataCall, &Parser::on_url>::Raw,
Proxy<DataCall, &Parser::on_status>::Raw,
Proxy<DataCall, &Parser::on_header_field>::Raw,
Proxy<DataCall, &Parser::on_header_value>::Raw,
Proxy<Call, &Parser::on_headers_complete>::Raw,
Proxy<DataCall, &Parser::on_body>::Raw,
Proxy<Call, &Parser::on_message_complete>::Raw,
nullptr, // on_chunk_header
nullptr // on_chunk_complete
};
这里我们以on_header_complete钩子来分析。
const uint32_t kOnHeadersComplete = 1
int on_headers_complete() {
Local<Value> cb = obj->Get(kOnHeadersComplete);
MakeCallback(cb.As<Function>(), arraysize(argv), argv);
}
最后会执行kOnHeadersComplete这个函数。我们看到这个kOnHeadersComplete 等于1,其实这个是在js层复赋值的。在_http_common.js中的开头。
const kOnHeadersComplete = HTTPParser.kOnHeadersComplete | 0;
然后在新建一个http解析器的函数注册了该函数。
parser[kOnHeadersComplete] = parserOnHeadersComplete;
所以当解析头部结束就会执行parserOnHeadersComplete。
function parserOnHeadersComplete(...) {
parser.incoming = new IncomingMessage(parser.socket);
...
return parser.onIncoming(parser.incoming, shouldKeepAlive);
}
新建了一个IncomingMessage对象,然后执行_http_server.js注册的回调onIncoming 。该回调函数也是再建立tcp连接时注册的。
function parserOnIncoming() {
var res = new ServerResponse(req);
...
server.emit('request', req, res);
}
生成一个ServerResponse对象,然后触发request事件。该函数是在我们执行http.createServer时传进行的函数。
function Server(requestListener) {
...
// 收到http请求时执行的回调
if (requestListener) {
this.on('request', requestListener);
}
}
最后在我们的回调里就拿到了这两个对象。但是这时候只是解析完了头部,request对象里还拿不到body的数据。我们需要自己获取。
var str = "";
req.on('data', (data) => {
str += data;
});
req.on('end',() => {})