使用 eventproxy、async.mapLimit、async.queue 来控制并发

nodejs高并发大流量的设计实现

原理:非阻塞事件驱动实现异步开发,通过事件驱动的I/O来操作完成跨平台数据密集型实时应用 传统的server 每个请求生成一个线程, nodejs是一个单线程的,使用libuv保持数万并发

使用 eventproxy 实现控制并并发

  1. var EventProxy = require('eventproxy');
  2. const most = 5;//并发数5
  3. var urllist = [....];//待抓取url列表,100个
  4. function foo(start){
  5. var ep = new EventProxy();
  6. ep.after('ok',most,function(){
  7. foo(start+most);//一个批次任务完成,递归进行下一批任务
  8. });
  9. var q=0;
  10. for(var i=start;i<urllist.length;i++){
  11. if(q>=most){
  12. break;//最多添加most个任务
  13. }
  14. http.get(urllist[i],function(res){
  15. //....
  16. res.on('end',function(){
  17. ep.emit('ok');//一个任务完成,触发一次ok事件
  18. });
  19. });
  20. q++;
  21. }
  22. }
  23. foo(0);

使用 async.mapLimit 控制并发

  1. var async = require('async');
  2. //模拟一组连接地址
  3. var urls = [];
  4. for(var i = 0; i < 30; i++) {
  5. urls.push('http://datasource_' + i);
  6. }
  7. console.log(urls);
  8. // 并发连接数的计数器
  9. var concurrencyCount = 0;
  10. // 并发抓取数据的过程
  11. var fetchUrl = function (url, callback) {
  12. // delay 的值在 2000 以内,是个随机的整数
  13. var delay = parseInt((Math.random() * 10000000) % 2000, 10);
  14. concurrencyCount++;
  15. console.log('现在的并发数是', concurrencyCount, ',正在抓取的是', url, ',耗时' + delay + '毫秒');
  16. setTimeout(function () {
  17. concurrencyCount--;
  18. //抓取成功,调用回调函数
  19. callback(null, url + ' html content');
  20. }, delay);
  21. };
  22. //使用 async.mapLimit 来 5 个并发抓取,并获取结果
  23. async.mapLimit(urls, 5, function (url, callback) {
  24. fetchUrl(url, callback);
  25. }, function (err, result) {
  26. //所有连接抓取成功,返回回调结果列表
  27. console.log('final:');
  28. console.log(result);
  29. });

使用 async.queue 控制并发

  1. "use strict"
  2. var http = require('http');
  3. var cheerio = require('cheerio');
  4. var URL = require('url');
  5. var path = require('path');
  6. var fs = require('fs');
  7. var async = require('async');
  8. var baseUrl = "http://cnodejs.org/";
  9. var targetUrl = "http://cnodejs.org/";
  10. var stime = new Date();
  11. function sGet(url,callback){
  12. var chunks = [];
  13. http.get(url,(res)=>{
  14. if (res.statusCode != '200') {
  15. callback({message:"抓取失败,状态码:"+res.statusCode,url:url});
  16. return;
  17. }
  18. res.on('data',(chunk)=>{
  19. chunks.push(chunk);
  20. });
  21. res.on('end',()=>{
  22. callback(null,Buffer.concat(chunks).toString());
  23. });
  24. }).on('error',(e)=>{
  25. callback({message:"抓取失败",url:url,err:e});
  26. });
  27. }
  28. sGet(targetUrl,(err,data)=>{
  29. if (err) {
  30. console.log(err);
  31. return false;
  32. }
  33. var $ = cheerio.load(data);
  34. var anchors = $("#topic_list a.topic_title");
  35. console.log('共'+anchors.length+'个任务');
  36. const most=5;//并发数
  37. //创建队列并指定并发数
  38. var q=async.queue(function(url,callback){
  39. var filename = path.basename(url)+'.txt';
  40. sGet(url, (err, data)=> {
  41. if (err) {
  42. callback(err);
  43. return false;
  44. }
  45. fs.writeFile('./html/' + filename, data, function (err) {
  46. if (err) {
  47. throw err;
  48. }
  49. callback(null,filename);
  50. });
  51. });
  52. },most);
  53. q.drain = function() {
  54. console.log('任务全部完成,共耗时:'+(new Date()-stime)+'ms');
  55. }
  56. anchors.each(function(){
  57. var url = URL.resolve(baseUrl,$(this).attr('href'));
  58. q.push(url,function(err,filename){
  59. if (err) {
  60. console.log(err);
  61. return;
  62. }
  63. console.log("finished:"+filename);
  64. });
  65. });
  66. });