学习node.js stream工作原理过程中,在stream.readable中看到如下一段代码:
// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
!cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
increasedAwaitDrain = true;
}
src.pause();
}
}
虽然看了increasedAwaitDrain变量上的注释,但还是不能理解这个变量的作用,Node.js是单线程的,那么对于下面的这三行同步代码:
increasedAwaitDrain = false;
var ret = dest.write(chunk); // dest.write中有异步逻辑,但是同步返回
if (false === ret && !increasedAwaitDrain)
- 第一行
increasedAwaitDrain
赋值成false - 第二行执行同步代码
var ret = dest.write(chunk);
- 第三行在if中用到了
increasedAwaitDrain
这个变量
我理解的是这个变量一定是false,而按照increasedAwaitDrain
定义上方的注释,第三行代码中increasedAwaitDrain
可能为true?这有点违背我的知识和经验…什么情况下if中increasedAwaitDrain的值为true???
百思不得解,望高手帮忙解答,感激不尽!!!
awaitDrain 是控制 flow 的,不是 0 就是 1, 在触发 drain 的时候会自减一次,
function pipeOnDrain(src) {
return function() {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
if (state.awaitDrain)
state.awaitDrain--;
if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
};
}
正常是流程是 emit(‘data’) 触发一次 ondata, 如果 ret = dest.write(chunk) 返回 false (也就是 state.length < state.highWaterMark) awaitDrain + 1. 如果这时候你再次 push 一次,也返回 false 了, awaitDrain 2, 那么对不起 readable 就断流了。所以要加一个 increasedAwaitDrain ,
var ret = state.length < state.highWaterMark;
// we must ensure that previous needDrain will not be reset to false.
if (!ret)
state.needDrain = true;
应该是这样。。另外个人觉得看 stream 很不值, 看完两天就忘掉
@yviscool 你讲的内容我理解,我不是对pipe这个完整流程有疑惑,而是我在问题中提到的那三行代码:
increasedAwaitDrain = false;
var ret = dest.write(chunk); // dest.write中有异步逻辑,但是同步返回
if (false === ret && !increasedAwaitDrain)
- 第一行
increasedAwaitDrain
赋值成false - 第二行执行同步代码
var ret = dest.write(chunk);
- 第三行在if中用到了
increasedAwaitDrain
这个变量
我理解的是这个变量一定是false,而按照increasedAwaitDrain
定义上方的注释,第三行代码中increasedAwaitDrain
可能为true?这有点违背我的知识和经验…
// 多谢答主的友情tips,其实我不是为了学习Node.js而去看这段代码,是因为在实践遇到pipe相关问题才去翻代码看的 ^ _ ^
@Shasharoman false === false && !false …
已经确认increasedAwaitDrain
在整段代码中就是一个无用存在,因为我看的是v8.11.1版本的代码,而最新版本的实现中已经没有这段代码
看来多翻翻Node.js源代码,保不定就可以给官方提PR了,哈哈