遇到内存泄露问题,求大神帮助
发布于 17小时前 作者 moon9sky 99 次浏览 最后一次编辑是 15小时前 来自 问答

直接上代码吧。

var mqtt = require('mqtt');
var net  = require('net');
var buffers   = [];
var leng = 0;
var client = mqtt.connect("mqtt://192.168.0.76:8000");
client.on('connect',function(){
    client.subscribe('other');
    buffers   = [];
    leng = 0;
});
client.subscribe('other');
var wk  = new net.Socket();
wk.connect(1234,"192.168.0.153",function() {
    buffers   = [];
    leng = 0;
    console.log("connect to workerman server");
});
wk.on('error',function(){
    wk.connect(1234,"192.168.0.153",function() {
        buffers   = [];
        leng = 0;
        console.log("connect to workerman server");
    });
});
client.on('message', function (topic, message,packet) {
    var info = enHGdata(topic,message);
    wk.write(info,function(){
    });
});
wk.on('data',function(data){
   var info = deHGdata(data);
   for (var i in info){
       if (info[i] == 17476){
           client.publish('test', info[i*1+2])
       }
   }
});
> 
function enHGdata(topic,msg){
    var info =[];
    var data      = msg
    data = new Buffer(data);
    var order   = new Buffer([0x44,0x44]);
    var msglength = new Buffer(4);
    msglength.writeUInt32BE(data.length, 0);
    var topic = new Buffer(topic);
    var topicleng = new Buffer(2);
    topicleng.writeUInt16BE(topic.length, 0);
    info.push(order);
    info.push(topicleng);
    info.push(msglength);
    info.push(topic);
    info.push(data);
    var HGdata = Buffer.concat(info) ;
    info = null;
    data = null;
    order = null;
    msglength = null;
    msg      = null;
    topic = null;
    topicleng = null;
    return HGdata;
}
> 
function deHGdata(all){
    var buffer    = all;
    var bufferdata;
    if (leng == 0) {
        leng = buffer.slice(2, 4).readUInt16BE(0) +buffer.slice(4, 8).readUInt32BE(0)+8;
    }
    if (buffers.length == 0){
        if (buffer.length == leng){
            var msg = deOnePackage(buffer);
            buffer = null;
            all = null;
            console.log("deCode 1");
            return msg;
        }
        else if (buffer.length > leng){
            var msg = deMulPackage(buffer);
            buffer = null;
            all = null;
            console.log("deCode 2");
            return msg;
        }
        else
        {
            buffers.push(buffer);
            buffer = null;
            console.log("saveCode 1");
            all = null;
        }
    }
    else
    {
       if (leng == 0)
       {
           bufferdata = Buffer.concat(buffers);
           if (bufferdata.length >= 8){
               leng = bufferdata.slice(2, 4).readUInt32BE() +bufferdata.slice(4, 8).readUInt32BE()+8;
               bufferdata = null;
           }else{
               buffers.push(buffer);
               bufferdata = null;
               buffer = null;
               all = null;
           }
       }
       else{
           if (wk.bytesRead < leng){
               buffers.push(buffer);
               console.log("saveCode 2");
               buffer = null;
               all = null;
           }
           else if(wk.bytesRead == leng){
               buffers.push(buffer);
               bufferdata = Buffer.concat(buffers);
               var msg = deOnePackage(bufferdata);
               buffers = null;
               buffers = [];
               leng     = 0;
               wk.bytesRead = 0;
               bufferdata = null;
               buffer = null;
               all = null;
               console.log("deCode 3");
               return msg;
           }
           else
           {
               buffers.push(buffer);
               var buffer = Buffer.concat(buffers);
               var msg = deMulPackage(buffer);
               bufferdata = null;
               buffer = null;
               all = null;
               leng     = 0;
               console.log("deCode 4");
               return msg
       }
    }
}
}
function deOnePackage(buffer){
    var data = [];
    var order,topicleng,topic,info,leng;
    order     = buffer.slice(0, 2).readUInt16BE(0);
    topicleng = buffer.slice(2, 4).readUInt16BE(0);
    leng      = buffer.slice(4, 8).readUInt32BE(0)+topicleng+8;
    topic     = buffer.slice(8, 8+topicleng).toString();
    info      = buffer.slice(8+topicleng,leng);
    data.push(order);
    data.push(topic);
    data.push(info);
    order = null;
    topicleng = null;
    leng = null;
    topic = null;
    info = null;
    buffer = null;
    return data;
}
> 
function deMulPackage(all) {
    var buffer = all;
    var n =0;
    var data = [];
    var oneleng = all.slice(2, 4).readUInt16BE(0) + all.slice(4, 8).readUInt32BE(0) + 8;
    do {
        var order,topicleng,leng,topic,info;
        order     = buffer.slice(0, 2).readUInt16BE(0);
        topicleng = buffer.slice(2, 4).readUInt16BE(0);
        leng      = buffer.slice(4, 8).readUInt32BE(0)+topicleng+8;
        topic     = buffer.slice(8, 8 + topicleng).toString();
        info      = buffer.slice(8 + topicleng,leng);
        data.push(order);
        data.push(topic);
        data.push(info);
        buffer  = all.slice(n,leng+n);
        var bufferleft = all.slice(leng+n,all.length);
        n = n +oneleng;
        oneleng = buffer.slice(2, 4).readUInt16BE(0) + buffer.slice(4, 8).readUInt32BE(0) + 8;
    } while (bufferleft.length >= oneleng);

    if (bufferleft.length != 0) {
        buffers = null;
        buffers = [];
        buffers.push(buffer);
        wk.bytesRead = bufferleft.length;
        console.log("saveCode 3");
    }
    else{
        buffers = null;
        buffers = [];
    }
    buffer = null;
    all = null;
    oneleng = null;
    order = null;
    topicleng = null;
    leng = null;
    topic = null;
    info = null;
    bufferleft = null;
    return data;
}

主要是做一个mqtt的消息转发器,将mqtt格式和公司内部的通信格式相互转化,但是在压力测试的时候(mqtt每秒约3000的消息发布,同时转化格式通过socket转发,socket服务器会回复同样的消息,接收后在通过mqtt用另一个主题发布)内存占用会一直上涨,空闲的情况下也不会释放,求大神指导!!!

回到顶部