并发数量控制怎么实现?
发布于 16 小时前 作者 zstxt1989 186 次浏览 来自 问答

一开始我想的太简单了

var tasks = 0;//当前并发数量
function foo(url){
	while(tasks > 2){//并发超过2,无限循环等前面的任务执行结束
		continue;
	}
	tasks++;
	http.get(url,function(){
		tasks--;//任务执行完毕,tasks-1
	})
}
var urls = [....]
for(var i in urls) {
	foo(urls[i]);
}

node是单线程的,while死循环导致异步任务没法进行,tasks–也就无法执行到。 请教一下有没有其他办法可以人为控制并发数量和异步任务时间间隔?

10 回复

深入浅出nodejs一书中,9页表格下面。 有说道:

由于js单线程的原因,如果有长时间运行的计算,导致后续IO无法发起。解决方案是,调整和分解这类任务成多个小小任务,使得计算能够即使释放,不会阻塞IO发起

看能否供参考?

要这么干的话使用settimeout将任务延后吧

并发数量可以看做是 worker 数目, 每个 worker 从 task queue 里面取任务

用bluebird的Promise.map可以设置concurrency

bluebird Promise.map确实可以控制并发,不想用bluebird的可以用这个 https://github.com/magicdawn/promise.map

promise-map 这个包名被另一个人拿去用 arr.map 去简单map一下了,相当于并发为Infinity. 于是就有了promise.map

promise.map 代码是取自 async.parallelLimit, 使用 callback 的方式的童鞋可以用这个 async.js

@bigtree9307 @magicdawn 谢谢两位的建议,其实我也往这方面考虑了,用的eventproxy来实现的。思路大致如下:

var EventProxy = require('eventproxy');
const most = 5;//并发数5
var urllist = [....];//待抓取url列表,100个
foo(start){
	var ep = new EventProxy();
	ep.after('ok',most,function(){
		foo(start+most);//一个批次任务完成,递归进行下一批任务
	});
	var q=0;
	for(var i=start;i<urllist.length;i++){
		if(q>=most){
			break;//最多添加most个任务
		}
		http.get(urllist[i],function(res){
			//....
			res.on('end',function(){
				ep.emit('ok');//一个任务完成,触发一次ok事件
			});
		});
		q++;
	}
}
foo(0);

发现 async.queue 非常合适用来控制并发,附上代码:

/**
 * Created by admin on 16/3/20.
 */
"use strict"
var http = require('http');
var cheerio = require('cheerio');
var URL = require('url');
var path = require('path');
var fs = require('fs');
var async = require('async');

var baseUrl = "http://cnodejs.org/";
var targetUrl = "http://cnodejs.org/";
var stime = new Date();

function sGet(url,callback){
  var chunks = [];
  http.get(url,(res)=>{
	if (res.statusCode != '200') {
	  callback({message:"抓取失败,状态码:"+res.statusCode,url:url});
	  return;
	}
	res.on('data',(chunk)=>{
	  chunks.push(chunk);
	});
	res.on('end',()=>{
	  callback(null,Buffer.concat(chunks).toString());
	});
  }).on('error',(e)=>{
	callback({message:"抓取失败",url:url,err:e});
  });
}

sGet(targetUrl,(err,data)=>{
  if (err) {
	console.log(err);
	return false;
  }
  var $ = cheerio.load(data);
  var anchors = $("#topic_list a.topic_title");
  console.log('共'+anchors.length+'个任务');

  const most=5;//并发数
	//创建队列并指定并发数
  var q=async.queue(function(url,callback){
	var filename = path.basename(url)+'.txt';
	sGet(url, (err, data)=> {
	  if (err) {
		callback(err);
		return false;
	  }
	  fs.writeFile('./html/' + filename, data, function (err) {
		if (err) {
		  throw err;
		}
		callback(null,filename);
	  });
	});
  },most);

  q.drain = function() {
	console.log('任务全部完成,共耗时:'+(new Date()-stime)+'ms');
  }

  anchors.each(function(){
	var url = URL.resolve(baseUrl,$(this).attr('href'));
	q.push(url,function(err,filename){
	  if (err) {
		console.log(err);
		return;
	  }
	  console.log("finished:"+filename);
	});
  });
});

async 有各种 limit 结尾的函数专门用来处理并发。如果你的场景比较类似队列的话,就用 async.queue

回到顶部