遇到内存泄露问题,求大神帮助
直接上代码吧。
var mqtt = require('mqtt');
var net = require('net');
var buffers = [];
var leng = 0;
var client = mqtt.connect("mqtt://192.168.0.76:8000");
client.on('connect',function(){
client.subscribe('other');
buffers = [];
leng = 0;
});
client.subscribe('other');
var wk = new net.Socket();
wk.connect(1234,"192.168.0.153",function() {
buffers = [];
leng = 0;
console.log("connect to workerman server");
});
wk.on('error',function(){
wk.connect(1234,"192.168.0.153",function() {
buffers = [];
leng = 0;
console.log("connect to workerman server");
});
});
client.on('message', function (topic, message,packet) {
var info = enHGdata(topic,message);
wk.write(info,function(){
});
});
wk.on('data',function(data){
var info = deHGdata(data);
for (var i in info){
if (info[i] == 17476){
client.publish('test', info[i*1+2])
}
}
});
>
function enHGdata(topic,msg){
var info =[];
var data = msg
data = new Buffer(data);
var order = new Buffer([0x44,0x44]);
var msglength = new Buffer(4);
msglength.writeUInt32BE(data.length, 0);
var topic = new Buffer(topic);
var topicleng = new Buffer(2);
topicleng.writeUInt16BE(topic.length, 0);
info.push(order);
info.push(topicleng);
info.push(msglength);
info.push(topic);
info.push(data);
var HGdata = Buffer.concat(info) ;
info = null;
data = null;
order = null;
msglength = null;
msg = null;
topic = null;
topicleng = null;
return HGdata;
}
>
function deHGdata(all){
var buffer = all;
var bufferdata;
if (leng == 0) {
leng = buffer.slice(2, 4).readUInt16BE(0) +buffer.slice(4, 8).readUInt32BE(0)+8;
}
if (buffers.length == 0){
if (buffer.length == leng){
var msg = deOnePackage(buffer);
buffer = null;
all = null;
console.log("deCode 1");
return msg;
}
else if (buffer.length > leng){
var msg = deMulPackage(buffer);
buffer = null;
all = null;
console.log("deCode 2");
return msg;
}
else
{
buffers.push(buffer);
buffer = null;
console.log("saveCode 1");
all = null;
}
}
else
{
if (leng == 0)
{
bufferdata = Buffer.concat(buffers);
if (bufferdata.length >= 8){
leng = bufferdata.slice(2, 4).readUInt32BE() +bufferdata.slice(4, 8).readUInt32BE()+8;
bufferdata = null;
}else{
buffers.push(buffer);
bufferdata = null;
buffer = null;
all = null;
}
}
else{
if (wk.bytesRead < leng){
buffers.push(buffer);
console.log("saveCode 2");
buffer = null;
all = null;
}
else if(wk.bytesRead == leng){
buffers.push(buffer);
bufferdata = Buffer.concat(buffers);
var msg = deOnePackage(bufferdata);
buffers = null;
buffers = [];
leng = 0;
wk.bytesRead = 0;
bufferdata = null;
buffer = null;
all = null;
console.log("deCode 3");
return msg;
}
else
{
buffers.push(buffer);
var buffer = Buffer.concat(buffers);
var msg = deMulPackage(buffer);
bufferdata = null;
buffer = null;
all = null;
leng = 0;
console.log("deCode 4");
return msg
}
}
}
}
function deOnePackage(buffer){
var data = [];
var order,topicleng,topic,info,leng;
order = buffer.slice(0, 2).readUInt16BE(0);
topicleng = buffer.slice(2, 4).readUInt16BE(0);
leng = buffer.slice(4, 8).readUInt32BE(0)+topicleng+8;
topic = buffer.slice(8, 8+topicleng).toString();
info = buffer.slice(8+topicleng,leng);
data.push(order);
data.push(topic);
data.push(info);
order = null;
topicleng = null;
leng = null;
topic = null;
info = null;
buffer = null;
return data;
}
>
function deMulPackage(all) {
var buffer = all;
var n =0;
var data = [];
var oneleng = all.slice(2, 4).readUInt16BE(0) + all.slice(4, 8).readUInt32BE(0) + 8;
do {
var order,topicleng,leng,topic,info;
order = buffer.slice(0, 2).readUInt16BE(0);
topicleng = buffer.slice(2, 4).readUInt16BE(0);
leng = buffer.slice(4, 8).readUInt32BE(0)+topicleng+8;
topic = buffer.slice(8, 8 + topicleng).toString();
info = buffer.slice(8 + topicleng,leng);
data.push(order);
data.push(topic);
data.push(info);
buffer = all.slice(n,leng+n);
var bufferleft = all.slice(leng+n,all.length);
n = n +oneleng;
oneleng = buffer.slice(2, 4).readUInt16BE(0) + buffer.slice(4, 8).readUInt32BE(0) + 8;
} while (bufferleft.length >= oneleng);
if (bufferleft.length != 0) {
buffers = null;
buffers = [];
buffers.push(buffer);
wk.bytesRead = bufferleft.length;
console.log("saveCode 3");
}
else{
buffers = null;
buffers = [];
}
buffer = null;
all = null;
oneleng = null;
order = null;
topicleng = null;
leng = null;
topic = null;
info = null;
bufferleft = null;
return data;
}
主要是做一个mqtt的消息转发器,将mqtt格式和公司内部的通信格式相互转化,但是在压力测试的时候(mqtt每秒约3000的消息发布,同时转化格式通过socket转发,socket服务器会回复同样的消息,接收后在通过mqtt用另一个主题发布)内存占用会一直上涨,空闲的情况下也不会释放,求大神指导!!!