写在前面
through2
经常被用于处理node
的stream
,假如使用过gulp
的话,对于这个包一定不会陌生,如:
gulp.task('rewrite', () => {
return gulp.src('./through/enter.txt')
.pipe(through2.obj(function(chunk, enc, callback) {
const { contents } = chunk;
for (var i = 0; i < contents.length; i++) {
if (contents[i] === 97) {
contents[i] = 122;
}
}
chunk.contents = contents;
this.push(chunk);
callback();
}))
.pipe(gulp.dest('./dist'));
});
这里将文件中所有的字符a
转换为字符z
,在写gulp
插件时一定会应用到这个包,下面就来窥探一下这个使用率非常高的包。
Transform stream
through2
的源码仅仅就100多行,本质上就是对于node
原生的transform
流进行的封装,先来看下Transform stream
。Transform
是一个双工流,既可读,也可写,但是与Duplex
还是有着一些区别,Duplex
的写和读可以说是没有任何的关联,是两个缓冲区和管道互补干扰,而Transform
将其输入和输出是存在相互关联的,中间做了处理。具体差别可以参考下面图片对比:
Duplex stream
:
Transform stream
:
Transform stream
的两个缓存区相互关联,对于每个缓冲区来说,highWaterMark
为阈值,超过阈值后,将会停止读或者写操作,如:
let i = 0;
const readable = Readable({
highWaterMark: 2,
read: function () {
var data = i < 26 ? String.fromCharCode(i++ + 97) : null;
console.log('push', data);
this.push(data);
}
});
const transform = Transform({
highWaterMark: 2,
transform: function (buf, enc, next) {
console.log('transform', buf.toString());
next(null, buf);
}
})
readable.pipe(transform);
stream
流向为:
由于阈值为2
,所以只能push
到f
,这时readable
的缓存区已满,transform
的读缓存区和写缓存区已经满了(由于transform
的两个缓存区的阈值为2
,所以写缓存区在写入b
之后就已经满了,后续不能继续写入),全部满之后,自然停止了读取,最终e,f
存在A
中,c,d
存在B
中,a,b
存在C
中,想要解决很简单,在添加一个流向就可以:
readable.pipe(transform).pipe(process.stdout);
through2源码
在了解Transform stream
之后,through2
的源码非常的简单,就是对于其的一层封装,暴露出三个api
(through2
,through2.obj
,through2.ctor
)而且三者接收的参数一致,因为都是由一个工厂方法创造出的:
function through2 (construct) {
return function (options, transform, flush) {
// 做了一些参数整理
if (typeof options == 'function') {
flush = transform
transform = options
options = {}
}
if (typeof transform != 'function')
transform = noop
if (typeof flush != 'function')
flush = null
return construct(options, transform, flush)
}
}
来看一下through2
对于Transform stream
的再加工,也就是源码中的DestroyableTransform
,与其名字一样,就是一个替我们实现好了destory
方法的Transform stream
:
DestroyableTransform.prototype.destroy = function(err) {
if (this._destroyed) return
this._destroyed = true
var self = this
// 触发destory后,close掉流
process.nextTick(function() {
if (err)
self.emit('error', err)
self.emit('close')
})
}
through2
与through2.obj
全部是创造出一个再加工后的Transform
,区别如下:
- 后者开启了对象模式(
objectMode
属性为true
),写入的参数不仅仅限制在string or uint8Array
- 后者降低了阈值(
highWaterMark
为16
,而不是默认的16kb
),这样做的原因,是为了和node
的默认保持一致,具体可以参见
through2.ctor
可以用来再次定制,其返回的是一个构造函数,用法可以参考下面:
const Tran = through.ctor(function(chunk, enc, callback) {
console.log('transform', chunk.toString());
callback(null, chunk);
});
const transform = new Tran();
写在最后
stream
在node
中有着非常广泛的应用,但是它使用起来却不是那么友好,throgh2
的出现可以减少使用上的麻烦,其原理也非常的简单;以上内容均为本人理解,如有错误还请指出,不胜感激~