百思不得其解,关于transform流收到重复数据的问题
发布于 10 个月前 作者 lersh 1236 次浏览 来自 问答

我有个代码,定义了一个transform流来加密数据。如果将加密的数据直接存在本地,然后读取本地加密的文件,通过tcp发给客户端,客户端也定义了一个transform流来解密,这样是一切正常的。数据能正常解密。 但是如果是服务器端直接加密输出给客户端,客户端就会收到重复的数据,而且重复数据块每次都不一样。非常的诡异。 不知道是不是我对流的理解还有不足之处,希望能得到各位大牛的指点,谢谢。 代码如下:

服务器端代码:tcpserver.js

'use strict';
const net = require('net');
const crypto = require('crypto');
const stream = require('stream');

const password = '123qweASD';

var i = 1;

/**
 * Encrypts text by given key
 * @param Buffer text to encrypt
 * @param String masterkey
 * @returns String encrypted text, base64 encoded
 */
function encipherGCM(buff, masterkey) {
    try {
        // random initialization vector
        var iv = crypto.randomBytes(12);

        // random salt
        var salt = crypto.randomBytes(64);

        // derive key: 32 byte key length - in assumption the masterkey is a cryptographic and NOT a password there is no need for
        // a large number of iterations. It may can replaced by HKDF
        var key = crypto.pbkdf2Sync(new Buffer(masterkey), salt, 2145, 32, 'sha512');

        // AES 256 GCM Mode
        var cipher = crypto.createCipheriv('aes-256-gcm', key, iv);

        // encrypt the given buffer
        var encrypted = Buffer.concat([cipher.update(buff), cipher.final()]);

        // extract the auth tag
        var tag = cipher.getAuthTag();

        // generate output
        return Buffer.concat([salt, iv, tag, encrypted]);

    } catch (e) {
    }

    // error
    return null;
}

const Encodetransform = stream.Transform({
    highWaterMark: 128 * 1024,
    transform: function (buf, enc, next) {
        process.nextTick(() => {
            var encode_buff = encipherGCM(buf, password);
            var buff_len = new Buffer(4);
            buff_len.writeUInt32BE(encode_buff.length);
            var encode_buff_head = Buffer.concat([buff_len, encode_buff]);

            var md5 = crypto.createHash('md5');
            var md5_code = md5.update(buf).digest('hex');

            console.log(i++, 'buf', buf.length, 'encode_buff_head', encode_buff_head.length, 'md5', md5_code);
            next(null, encode_buff_head);
        });
    }
})


var server = net.createServer((clientSocket) => {
    i = 1;
    var rs = require('fs').createReadStream('./test.jpg')
    rs.pipe(Encodetransform).pipe(clientSocket);
});


server.listen(6000, '0.0.0.0', () => {
    console.log('Listening on 6000...');
});

客户端代码:client.js

'use strict'
const net = require('net');
const crypto = require('crypto');
const stream = require('stream');
const fs = require('fs');

const password = '123qweASD';

const ws = fs.createWriteStream('./test_Deciphered.jpg');

/**
 * Decrypts text by given key
 * @param Buffer base64 encoded input data
 * @param String masterkey
 * @returns Buffer decrypted (original) text
 */
function decipherGCM(data, masterkey) {

    var bData = data;

    // convert data to buffers
    var salt = bData.slice(0, 64);
    var iv = bData.slice(64, 76);
    var tag = bData.slice(76, 92);
    var buff = bData.slice(92);
    try {
        // derive key using; 32 byte key length
        var key = crypto.pbkdf2Sync(new Buffer(masterkey), salt, 2145, 32, 'sha512');

        // AES 256 GCM Mode
        var decipher = crypto.createDecipheriv('aes-256-gcm', key, iv);
        decipher.setAuthTag(tag);

        // encrypt the given buffer
        var decrypted = Buffer.concat([decipher.update(buff), decipher.final()]);

        return decrypted;

    } catch (e) {
    }

    // error
    return null;
}


var c = 0, i = 1;

const Transform = stream.Transform;
class DecryptStream extends Transform {
    constructor() {
        super();
        this.highWaterMark = 2;
        this._isRemain = false;//有没有遗留数据
        this._remainBuff = new Buffer(0);//上次遗留的数据
    }

    _transform(buf, enc, next) {
        process.nextTick(() => {
            var self = this;
            var currectBuffer;
            //console.log(buf.length);
            if (!this._isRemain) {//如果没有待处理数据
                currectBuffer = buf;
            }
            else {//如果有待处理数据
                this._remainBuff = Buffer.concat([this._remainBuff, buf]);//将待处理数据和这次的数据拼接
                currectBuffer = this._remainBuff;
            }
            while (currectBuffer.length > 0) {
                var buff_len = currectBuffer.slice(0, 4);
                var len = buff_len.readUInt32BE(0);
                if (currectBuffer.length < len + 4) {//如果当前数据块不完整
                    this._isRemain = true;
                    this._remainBuff = currectBuffer;//将这部分数据存入待处理数据
                    currectBuffer = new Buffer(0);
                }
                else {
                    var data = currectBuffer.slice(4, len + 4);//取出一块数据,slice第二个参数是索引值
                    var decrypted_data = decipherGCM(data, password);//解密
                    if (decrypted_data === null)
                        console.log('Decrypto Error!');
                    var md5 = crypto.createHash('md5');
                    var md5_code = md5.update(decrypted_data).digest('hex');
                    console.log(i++, 'decrypted_data', decrypted_data.length, 'md5', md5_code);

                    this.push(decrypted_data);//push出去
                    var next_data = currectBuffer.slice(len + 4);//获取剩下的数据
                    currectBuffer = next_data;
                }
            }
            next();
        });
    }
}

net.connect(6000, '127.0.0.1', function () {
    var server = this;
    var decryptStream = new DecryptStream();
    server.pipe(decryptStream).pipe(ws);
});
7 回复

如果用以下代码将加密收的数据存到本地

var rs = require('fs').createReadStream('./test.jpg');
var ws = require('fs').createWriteStream('./test_Ciphered.jpg');
rs.pipe(Encodetransform).pipe(ws);

然后直接读取本地文件,客户端收到数据后再解密就不会有问题

var rs = require('fs').createReadStream('./test_Ciphered.jpg')
rs.pipe(clientSocket);

想写个梯子? 快递拿好

我这边程序跑的正常,不过服务端有个逻辑你需要修改下,那就是Encodetransform流式一次性消费的,所以服务端的程序只能在第一次跑成功,第二次就会失败。改成这样就ok:

var server = net.createServer((clientSocket) => {
    try{
        console.log(212)
        i = 1;
        var rs = require('fs').createReadStream('./test.png')
        const Encodetransform = stream.Transform({
            highWaterMark: 128 * 1024,
            transform: function (buf, enc, next) {
                process.nextTick(() => {
                    var encode_buff = encipherGCM(buf, password);
                    var buff_len = new Buffer(4);
                    buff_len.writeUInt32BE(encode_buff.length);
                    var encode_buff_head = Buffer.concat([buff_len, encode_buff]);
        
                    var md5 = crypto.createHash('md5');
                    var md5_code = md5.update(buf).digest('hex');
        
                    console.log(i++, 'buf', buf.length, 'encode_buff_head', encode_buff_head.length, 'md5', md5_code);
                    next(null, encode_buff_head);
                });
            }
        })
        rs.pipe(Encodetransform).pipe(clientSocket);
    }catch(e){
        console.log(e);
    }
    
});

关于transform流的解释,可以参阅 深入node之Transform

@royalrover 谢谢回复,你可以试一下大一点的文件,比如2m以上的图片,或者把服务端放到远程vps上,就会出现重复数据的问题了

@lersh 估计是缓冲区文体的,文档里写了,合理使用drain

@i5ting 谢谢回复,不过我怎么在stream内部调用drain呢?

打了一堆log终于知道问题出在哪里了,是我自己不小心,在currectBuffer恰好等于len+4时,解密后没有重置this._isRemainthis._remainBuff

回到顶部