并发数量控制怎么实现?
一开始我想的太简单了
var tasks = 0;//当前并发数量
function foo(url){
while(tasks > 2){//并发超过2,无限循环等前面的任务执行结束
continue;
}
tasks++;
http.get(url,function(){
tasks--;//任务执行完毕,tasks-1
})
}
var urls = [....]
for(var i in urls) {
foo(urls[i]);
}
node是单线程的,while死循环导致异步任务没法进行,tasks–也就无法执行到。 请教一下有没有其他办法可以人为控制并发数量和异步任务时间间隔?
10 回复
深入浅出nodejs一书中,9页表格下面。 有说道:
由于js单线程的原因,如果有长时间运行的计算,导致后续IO无法发起。解决方案是,调整和分解这类任务成多个小小任务,使得计算能够即使释放,不会阻塞IO发起
看能否供参考?
@bigtree9307 @magicdawn 谢谢两位的建议,其实我也往这方面考虑了,用的eventproxy来实现的。思路大致如下:
var EventProxy = require('eventproxy');
const most = 5;//并发数5
var urllist = [....];//待抓取url列表,100个
foo(start){
var ep = new EventProxy();
ep.after('ok',most,function(){
foo(start+most);//一个批次任务完成,递归进行下一批任务
});
var q=0;
for(var i=start;i<urllist.length;i++){
if(q>=most){
break;//最多添加most个任务
}
http.get(urllist[i],function(res){
//....
res.on('end',function(){
ep.emit('ok');//一个任务完成,触发一次ok事件
});
});
q++;
}
}
foo(0);
发现 async.queue 非常合适用来控制并发,附上代码:
/**
* Created by admin on 16/3/20.
*/
"use strict"
var http = require('http');
var cheerio = require('cheerio');
var URL = require('url');
var path = require('path');
var fs = require('fs');
var async = require('async');
var baseUrl = "http://cnodejs.org/";
var targetUrl = "http://cnodejs.org/";
var stime = new Date();
function sGet(url,callback){
var chunks = [];
http.get(url,(res)=>{
if (res.statusCode != '200') {
callback({message:"抓取失败,状态码:"+res.statusCode,url:url});
return;
}
res.on('data',(chunk)=>{
chunks.push(chunk);
});
res.on('end',()=>{
callback(null,Buffer.concat(chunks).toString());
});
}).on('error',(e)=>{
callback({message:"抓取失败",url:url,err:e});
});
}
sGet(targetUrl,(err,data)=>{
if (err) {
console.log(err);
return false;
}
var $ = cheerio.load(data);
var anchors = $("#topic_list a.topic_title");
console.log('共'+anchors.length+'个任务');
const most=5;//并发数
//创建队列并指定并发数
var q=async.queue(function(url,callback){
var filename = path.basename(url)+'.txt';
sGet(url, (err, data)=> {
if (err) {
callback(err);
return false;
}
fs.writeFile('./html/' + filename, data, function (err) {
if (err) {
throw err;
}
callback(null,filename);
});
});
},most);
q.drain = function() {
console.log('任务全部完成,共耗时:'+(new Date()-stime)+'ms');
}
anchors.each(function(){
var url = URL.resolve(baseUrl,$(this).attr('href'));
q.push(url,function(err,filename){
if (err) {
console.log(err);
return;
}
console.log("finished:"+filename);
});
});
});