Node 下 Http Streaming 的跨浏览器实现
发布于 4年前 作者 qingdu 4631 次浏览 最后一次编辑是 3年前

最近考虑把整个前端架构使用http streaming方式实现

对这方面做了一下调研,并在在node上实现了一个简单的原型


顺带提一下,

楼下pengchun同学所提到的node chat使用的是longpoll的模型

和httpstreaming同属与http comet的解决方案.

不过在具体http连接的处理上有所不同

long poll在数据通道每次接收到一个数据包后即关闭连接,并立即重新打开数据通道

http streaming的数据通道始终处于打开状态.

具体的介绍可以看这里 http://en.wikipedia.org/wiki/Comet_(programming)#Streaming


一些细节:

由于ie下在xhr readystate=3时无法取得responseText,

因此在ie下改为通过使用htmlfile控件调用iframe实现

另在输出正式数据前需现输出1k的噪音数据

以解决浏览器的阻塞问题


原型功能设计如下


pipe.png


具体代码如下


pipe.js: 主服务程序



var http = require('http'),
fs = require('fs'),
url = require('url'),
page = null;
// static files read & watch
var readFile = function(files) {
var buffers = {};
// sync read
var fread = function(file, cb){
fs.readFile(file, 'binary', function(err, data){
if (err) {
throw err;
}
buffers[file] = new Buffer(data, 'binary');
console.log('load', file)
});
}
// watch changes
var watch = function watch(file) {
fs.watchFile(file, {persistent: true, interval: 100}, function (curr, prev) {
if (curr.mtime.getTime() != prev.mtime.getTime()) {
fread(file);
}
});
}
// run all files
for (var i = 0; i < files.length; i++) {
watch(files[i]);
fread(files[i]);
}
return buffers;
}
// http query
var httpQuery = function(u, cb){
console.log('http begin');
// parse url
var uinfo = url.parse(u);
// create client
var client = http.createClient(uinfo.port ? uinfo.port : 80, uinfo.hostname, false);
var uri = uinfo.pathname + (uinfo.search ? uinfo.search : '');
var req = client.request('GET', uri, {'host': uinfo.hostname});
// send request
req.end();
console.log('http request sent');


var len = 4096;
var pointer = 0;
var extendFactor = 2;
// response start
req.on('response', function (res) {
if (res.headers['content-length']) {
len = parseInt(res.headers['content-length']);
}
// body init
var body = new Buffer(len);
// chunk recived
res.on('data', function(chunk){
// extends
if (pointer + chunk.length > len) {
len *= extendFactor;
body = body.copy(new Buffer(len), 0, 0);
console.log('proxy extend to', len);
}
// copy chunk to buf
chunk.copy(body, pointer, 0);
// move pointer
pointer += chunk.length;
})
// response end
res.on('end', function() {
cb(body.length > pointer ? body.slice(0, pointer) : body);
console.log('proxy end', pointer);
});
})
}

// main server
var server = http.createServer(function (req, res){
// main page
if (req.url == '/') {
res.writeHeader(200);
res.end(page["pipe.html"]);
// time serve
} else if (req.url == '/time') {
res.writeHeader(200);
res.end(new Date().toString());
// iframe recv
} else if (req.url.match(/^\/iframe\//)) {
var clientid = parseInt(req.url.substr(8));
pipeClient.add(clientid, res, pipeClient.iframe);
console.log('iframe connect', clientid);
// ajax recv
} else if (req.url.match(/^\/ajax\//)) {
var clientid = parseInt(req.url.substr(6));
pipeClient.add(clientid, res, pipeClient.ajax);
console.log('ajax connect', clientid);
// request listen
} else if (req.url.match(/^\/req\//)) {
res.writeHeader(200,{
'Cache-Control': 'no-cache, must-revalidate'
});
res.end();
// url parse
var clientid = parseInt(req.url.substr(5, 13));
// get page
httpQuery("http://localhost:8000/time", function (data){
console.log(data.toString());
pipeClient.write(clientid, data);
console.log("write", clientid, data.length);
});
// error pages
} else {
res.writeHeader(404, {"Content-Type" : "text/html"});
res.end();
}
});

var pipeClient = {
timeout : 30000,
client : {},
prefix : "<script>s('",
suffix : "');</script>",
iframe : 'iframe',
ajax : 'ajax',
noise : null,
noiseSize : 1024,
page : null,
init : function(){
this.noise = new Buffer(1024);
for (var i = 0; i < this.noiseSize; i++) {
this.noise[i] = 32;
}
this.page = readFile(['iframe.html']);
},
add : function(id, res, type) {

if (type == this.ajax) {
res.writeHeader(200, {
'Cache-Control': 'no-cache, must-revalidate'
});
res.write(this.noise);
} else {
res.writeHeader(200, {
"Content-Type" : "multipart/x-mixed-replace",
'Cache-Control': 'no-cache, must-revalidate'
});
res.write(this.page['iframe.html']);
res.write(this.noise);
}
this.client[id] = {
res : res,
type : type,
tm : setTimeout(function(){
pipeClient.close(id);
}, this.timeout)
};
},
close : function (id) {
console.log("client close", id)
this.client[id].res.end();
this.client[id].res = null;
delete this.client[id];
},
write : function (id, data) {
clearTimeout(this.client[id].tm);
this.client[id].tm = setTimeout(function(){
pipeClient.close(id);
}, this.timeout);
this.client[id].res.write(this.format(data, this.client[id].type));

},
format : function(data, type) {
// with iframe
if (type == this.iframe) {
var buf = new Buffer(this.prefix.length + data.length + this.suffix.length);
buf.write(this.prefix, 0, 'binary');
data.copy(buf, this.prefix.length, 0);
buf.write(this.suffix, this.prefix.length + data.length);
// with ajax
} else {
var buf = new Buffer(data.length + 8);
// set length
buf.write(data.length.toString(16), 0, 'binary');
// space padding
for (var i = data.length.toString(16).length; i < 8; i++) {
buf[i] = 32;
}
// set data
data.copy(buf, 8, 0);
}
console.log(buf.toString());
return buf;
}
}
pipeClient.init();

page = readFile(['pipe.html']);
setTimeout(function(){
server.listen(8000);
}, 500);


pipe.html: 客户端程序



<!DOCTYPE html>
<html lang="zh_CN">
<head>
<meta charset="utf-8" />
<link href='http://fonts.googleapis.com/css?family=Droid+Serif' rel='stylesheet' type='text/css'>
<title>Comet Pipe Demo</title>
<script type="text/javascript" src="http://www.google.com/jsapi"></script>
<script type="text/javascript">
google.load("jquery", '1.4');
google.setOnLoadCallback(function(){
$(document.body).ready(function(){


var Comet = function(options) {
// options init
var opt = {
domain : document.domain,
req : '/req',
iframe : '/iframe',
ajax : '/ajax',
timeout : 300,
cb : null,
cl : null,
ajaxFlagLen : 10,
ajaxNoiseLen : 1024
}
for (k in options) {
opt[k] = options[k];
}
// calculate bytes length for string
var blen = function(str) {
var arr = str.match(/[^\x00-\xff]/ig);
return arr == null ? str.length : str.length + arr.length;
}
// some arguments init
var pageid = new Date().getTime();
var type = $.browser.msie ? 'iframe' : 'ajax';
// public props
var me = {
pageid : pageid,
type : type,
recvUrl : opt[type] + '/' + pageid,
reqUrl : opt.req + '/' + pageid + '/' + type,
conn : null,
div : null,
send : function(){
$.get(this.reqUrl);
}
}
// init iframe receiver
if (me.type == 'iframe') {
// create htmlfire
me.conn = new ActiveXObject("htmlfile");
me.conn.open();
// set domain
me.conn.write("<html><script>document.domain = '"+opt.domain+"'</html>");
me.conn.close();
me.div = me.conn.createElement("div");
me.conn.appendChild(me.div);
me.conn.parentWindow.send = function (data) {
opt.cl ? opt.cb.call(opt.cl, data) : opt.cb(data);
};
me.div.innerHTML = "<iframe id='comet' src='" + me.recvUrl + "'></iframe>";
// init ajax receiver
} else {
me.conn = new XMLHttpRequest();

me.conn.open("GET", me.recvUrl, true);
var pos = 0;
var chunk = '';
var data = '';
var size = -1;
var noise = 0;
me.conn.onreadystatechange = function(){
if (this.readyState == 3 || this.readyState == 4) {
// chars length
chunk = this.responseText.substr(pos);
pos = this.responseText.length;
if (noise < opt.ajaxNoiseLen) {
noise += chunk.length;
return;
}
// bytes length
if (size < 0) {
size = parseInt(chunk.substr(0, 8), 16);
chunk = chunk.substr(8);
}
data += chunk;
// one done
if (blen(data) == size) {
opt.cl ? opt.cb.call(opt.cl, data) : opt.cb(data);
size = -1;
data = '';
}
}

}
me.conn.send();
}
return me;
}

function showMessage(msg){
var dom = document.createElement('h4');
dom.innerHTML = msg;
var prt = document.getElementById('msg');
if (prt.hasChildNodes()) {
prt.insertBefore(dom, prt.firstChild)
} else {
prt.appendChild(dom);
}
}
window.setTimeout(function(){
if ($.browser.msie) {
$('#btn').html('<input type="submit" id="go_iframe" value="Get time with iframe" />');
} else {
$('#btn').html('<input type="submit" id="go_ajax" value="Get time with ajax" />');
}
var c = new Comet({
cb : function(data) {
showMessage(data);
}
});

$('#go_iframe').click(function(){
c.send();
});
$('#go_ajax').click(function(){
c.send();
});
}, 0);


});
});
</script>
</head>
<body>




</body>
</html>

iframe.html: ie下iframe模式运行的输出头



<!DOCTYPE html>
<html lang="zh_CN">
<head><meta charset="utf-8" /></head>
<body>
<script>
function s(msg) {
window.parent.send(msg);
}
</script>

6 回复

30秒后由服务器端向客户端传输 数据的通道就关闭了?

对, 这里是开发时为了方便,避免浏览器的http并发上限阻塞用的
实际系统中可以去掉这块
或者在client中增加重连的功能

原来就在这里。。。先收藏

pipe通道在一段时间没有数据返回将会中断,服务器端再向它发送数据就会无效了。监听response的error事件又无法捕获到错误事件,这样会导致坏死的client链接越来越多。需要一种机制来处理这个问题。

“另在输出正式数据前需现输出1k的噪音数据” 这个是可以去掉的.只需设置’Content-Type’: ‘text/html;charset=utf-8’ 即可.注意其中charset声明是关键,不然浏览器不会实时渲染.测试案例可参考这个:https://gist.github.com/2419203

回到顶部