场景一:
现在的web应用可能都是一个这样的结构:
http服务(node) > 接口(业务逻辑) > 数据库
很多时候,瓶颈一般出现在业务层,或者数据层。更多的可能是某一个业务的处理,拉下整个系统的性能。
当用户或一些不怀好意的人,故意大量调用这些处理逻辑,好吧,你nodejs是非阻塞的,这一大波处理请求就一窝蜂冲到到业务层,很可能导致整个系统性能下降,或者瘫痪。
如果这个时候,node层就把这些耗资源的请求,排好队,控制好并发,甚至分用户排成队,配置好不同身份的用户并发量,不是很爽。
场景二:
应用中,可能会有一些无需即时处理的同一类业务,处理前都需要收集一次资源,处理业务,处理后再清理一次。高消耗工作主要在收集或是清理上。
如果我们将要处理的业务暂存到队列中,当队列数量到达一个值或是某个时间点时,我们一次性处理完队列中的任务。在消耗上,只做一次“收集”、“清理”的操作。
我的想法是,并不需要把整个业务的后续处理全都放到队列中去,而只是将高消耗的那一部分放入队列,利用Promise的异部处理机制来处理后续的操作。 在编写代码的时候你几乎可以忘记队列的存在,但是他就在那里默默的工作着,代码可读性和灵活性没有丝毫影响。
项目地址:
https://github.com/cnwhy/queue-fun
安装
npm install quque-fun
v0.2.1 发布了
- 这次修复大量bug,应该可以用到正式环境上了 欢迎大家 issues
- 队列增加批量添加方法
- 新增浏览器端支持文件(queue-fun.min.js)
还是贴个demo吧
var queuefun = require('queue-fun');
var Queue = queuefun.Queue(); //初始化Promise异步队列类
var q = queuefun.Q; //配合使用的Promise流程控制类,也可以使用q.js代替
//实列化一个最大并发为1的队列
var queue1 = new Queue(1);
//定义一个Promise风格的异步方法
function testfun(i){
var deferred = q.defer();
setTimeout(function(){
deferred.resolve(i)
},300)
return deferred.promise;
}
var log = function(a){ console.log(a); }
//向队列添加运行单元
queue1.push(testfun,[1]).then(console.log);
//插入普通方法会按Promises/A+规则反回promise
queue1.push(function(){return 2;}).then(console.log);
//插入优先执行项 (后进先出)
queue1.unshift(testfun,[0]).then(console.log);
//批量插入多个运行项 array
queue1.allArray([3,4],testfun,{'event_succ':log}).then(console.log)
//批量插入多个运行项 map
queue1.allMap({'a':5,'b':6,'c':7},testfun,{'event_succ':log}).then(console.log)
//执行队列
queue1.start();
/*
0
1
2
3
4
[ 3, 4 ]
5
6
7
{ a: 5, b: 6, c: 7 }
*/
我之前做过一个类似的q-parallel,用来做爬虫,在一个队列任务里面爬另外一个网站的数据,可以控制同时并发的任务数,我的需求比较简单,后来发现原来tj大神也写过一个类似的库 co-parallel,他用了es6的Generator 大神写得简单,就这么几行
var thread = require('co-thread');
module.exports = function *parallel(thunks, n){
var n = Math.min(n || 5, thunks.length);
var ret = [];
var index = 0;
function *next() {
var i = index++;
ret[i] = yield thunks[i];
if (index < thunks.length) yield next;
}
yield thread(next, n);
return ret;
};
@hcnode 我写这个的宗旨在于创建一个持续存在的公用队列,想用的时候随时用,不影响后续操作。不只是控制并发。
就拿爬虫打个比方: 采集整个站点,不可能把URL先猜好,放那里吧,只能是这样。
- 先爬一个入口页
- 存储并分析URL拿到后续需要爬取的URL(去外网链接,去重等)
- 再去分别爬取分析后的URL
- 回到第2步。
这样设计刚开始没问题,但只要运行不到10秒,系统就受不了。平均每个页面算10个有效链接,爬3层深10x10x10,可能系统就会同时request上千个页面,不是被禁就是自己假死/溢出,显然不合理。
这里就很有必要引入一个队列:
- 定义一个并发为10的队列
- 先爬一个入口页
- 存储并分析URL拿到后续需要爬取的URL(去外网链接,去重等)
- 再分别爬取分析后的URL的request(url)放入队列中。
简化的代码如下
var queuefun = require('queue-fun');
var Queue = queuefun.Queue(); //初始化Promise异步队列类
var q = queuefun.Q; //配合使用的Promise流程控制类,也可以使用q.js代替
var queue1 = new queue(10);
function getbody(url){
var deferred = q.defer();
request(url,function(err,res){
if(err) return deferred.reject(err)
deferred.resolve(res.body);
})
return deferred.promise;
}
//存储并分析URL拿到后续需要爬取的URL(去外网链接,去重等)
function saveAndGeturls(body){
//...
return arr;
}
//要爬取的URL都插入队列中
function toQueue(arr){
arr.forEach(function(v,i){
queue1.go(getbody,[v])
.then(function(body){
var arr = saveAndGeturls(body)
toQueue(arr);
})
})
}
toQueue(['http://baidu.com']) //从入口页开始
@cnwhy 是的,因为我并没有做队列的维护和管理,因为我的需求比较简单,呵呵,我觉得你这里也可以再优化一下,把执行的任务,将promise封装一下,调用者就只管defer的action:
queue1.go(function(url, defer){
request(url,function(err,res){
if(err) return deferred.reject(err)
deferred.resolve(res.body);
})
},[v])