新人求问:es7 async 如何做并发限制,比如限制为并发5?
刚学nodejs不久,使用es7的async 做了一个采集爬虫 ,想要限制并发,不知道该如何处理。 希望大家给个思路。 比如可以自定义限制并发数,设置一个并发值。下面是代码:最后面for循环 每次都只执行一个,执行完才走下一个。
require('traceur');
var request = require('superagent-charset');
var cheerio = require('cheerio');
var uid = require('uid');
var fs = require('fs');
/**
* 网络请求
* [@param](/user/param) url
* [@param](/user/param) charset
* [@returns](/user/returns) {Promise}
*/
var doRquest = function(url,charset){
return new Promise(function (resolve, reject){
if(charset==1){
request.get(url).end(function(err,res) {
if (err) reject(err);
resolve(res);
});
}else {
request.get(url).charset(charset).end(function(err,res) {
if (err) reject(err);
resolve(res);
});
}
});
}
var writeFile = function(buf,newName){
return new Promise(function (resolve, reject){
fs.writeFile(newName,buf, function (err) {
if (err) reject(err);
console.log(newName+"下载成功");
resolve(true);
});
});
}
/**
* 下载图片
*/
var downPic = async function(path,newName){
console.log(path);
try{
var bufRes = await doRquest(path,1);
let buf = bufRes.body;
/* fs.writeFile(newName,buf, function (err) {
if (err) reject(err);
console.log(newName+"下载成功");
//resolve(true);
});*/
let downRs = await writeFile(buf,newName);
if(!downRs){
console.log("保存出错了");
}
}catch(e) {
console.log(path+'错误了');
}
return 1;
}
/**
* 采集图片
*/
var getPic = async function(picUrl){
console.log('采集'+picUrl);
let linkArr = [];
try{
var picRes = await doRquest(picUrl,'gbk');
var $2 = cheerio.load(picRes.text);
$2('.puzibody img').each(function (idx, element) {
let $element = $2(element);
let src = $element.attr('src');
src = "http://www.xiayiqu.com"+src;
let newName = "file/"+uid(10)+src.substr(-4,4);
linkArr.push({url:src,name:newName});
});
//采集图片
for (let link of linkArr) {
await downPic(link.url,link.name);
}
}catch(e) {
console.log(picUrl+'错误了');
}
}
/**
* 采集网址
* [@returns](/user/returns) {Array}
*/
var getUrl = async function (){
let linkArr = [];
for(let i=1;i<=5;i++){
let url = 'http://www.xiayiqu.com/pu/list_'+i+'.html';
console.log("采集"+url);
try {
var res = await doRquest(url,'gbk');
var $ = cheerio.load(res.text);
$('.zuo li a').each(function (idx, element) {
var $element = $(element);
var link = $element.attr('href');
link = "http://www.xiayiqu.com"+link;
linkArr.push(link);
});
} catch(e) {
console.log(url+'错误了');
}
}
return linkArr;
};
//采集全部
var getAll = async function() {
let urls = await getUrl();
console.log("总页面"+urls.length);
for (let url of urls) {
await getPic(url);
}
return true;
}
//开始采集
getAll(); \n```
3 回复
现在并发是1 …
e.g 并发5
var getAll = async function() {
let urls = await getUrl();
console.log("总页面"+urls.length);
await Promise.map(urls, function(url) {
return getPic(url);
}, { concurrency: 5 });
return true;
}
Promise.map 由 bluebird 提供 note https://github.com/magicdawn/magicdawn/issues/18#issuecomment-70646869
caolan/async , async.js 有很多可以参考 e.g 参考 async.parallelLimit 写的Promise.map
module.exports = Promise.map = map;
function map(arr, fn, concurrency) {
concurrency = concurrency || 1;
return new Promise(function(resolve, reject) {
var completed = 0;
var started = 0;
var running = 0;
var results = new Array(arr.length);
(function replenish() {
if (completed >= arr.length) {
return resolve(results);
};
while (running < concurrency && started < arr.length) {
running++;
started++;
var index = started - 1;
fn.call(arr[index], arr[index], index) // item,index
.then(function(result) {
// console.log('done');
running--;
completed++;
results[index] = result;
replenish();
})
.catch(reject);
}
})();
});
}