Storm构架及运行原理

https://blog.csdn.net/weiyongle1996/article/details/77142245?utm_source=gold_browser_extension

Storm简介

Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理一样,Storm可以实时处理数据。另外,Storm操作简单,可用于任意编程语言。Apache Storm采用Clojure开发。Storm有很多应用场景,包括实时数据分析、联机学习、持续计算、分布式RPCETL等。
Hadoop专注于批处理。这种模型对许多清醒(比如为网页建立索引)已经足够,但还存在其他一些使用模型,它们需要来自高度动态的来源的实时信息。为了解决这个问题,就得借助Nathan Marz推出的Storm(现已被Apache孵化),Storm不处理静态数据,但可处理连续的流数据。

Storm特点

  • 编程简单:开发人员只要关注应用逻辑,而且跟Hadoop类似,Storm提供的编程原语也很简单

  • 高性能,低延迟:可以应用于广告搜索引擎这种要求对广告主的操作进行实时响应的场景

  • 分布式:可以轻松应对数据量大,单机搞不定的场景

  • 可扩展:随着业务发展,数据量和计算量越来越大,系统可水平扩展

  • 容错:单节点挂了不影响应用

  • 消息不丢失:保证消息处理

Storm和Hadoop的比较

  • Storm用于实时计算,Hadoop用于离线计算

  • Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批

  • Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中

  • Storm与Hadoop的编程模型相似

结构 Hadoop Storm
主节点 JobTracker Nimbus
从节点 TaskTRacker Supervisor
应用程序 Job Topology
工作进程名称 Child Worker
计算模型 Map/Reduce Spout/Bolt

Storm工作流程

一个工作的Storm集群应该有一个Nimbus和一个或多个supervisor。另一个重要的节点是Apache Zookeeper,它将用于Nimbus和Supervisor之间的协调。

  • 最初,Nimbus将等待“Storm拓扑”提交得它

  • 一旦提交拓扑,它将处理拓扑并收集要执行的所有任务和任务将被执行的顺序

  • 然后,Nimbus将任务均匀分配给所有可用的supervisor

  • 在特定的时间间隔,所有supervisor将向Nimbus发送心跳以通知它们仍然活着

  • 当supervisor终止并且不发送心跳时,则Nimbus将任务分配给另一个supervisor

  • 一旦所有的任务都完成后,supervisor将等待新的任务进去

  • 同时,终止的Nimbus将由服务监控工具自动重新启动

  • 重新启动的网络将从停止的地方继续。同样,终止Supervisor也可以自动重新启动。由于网络管理程序和Supervisor都可以自动重新启动,并两者都像以前一样继续,因此Storm保证至少处理所有任务一次

  • 一旦处理了所有拓扑,则网络管理器等待新的拓扑到达,并且类似的,管理器等待新的任务

默认情况下,Storm集群有两种模式

  • 本地模式

此模式用于开发,测试和调试,因为它是查看所有拓扑组件协调工作的最简单方法。在这种模式下,我们可以调整参数,使我们能够看到我们的拓扑如何在不同的Storm配置环境中运行。在本地模式下,Storm拓扑在本地机器上在单个JVM中运行

  • 生产模式

在这种模式下,我们将拓扑提交到工作Storm集群,该集群有许多进程组成,通常运行在不同的机器上。如在Storm的工作流中所讨论的,工作集群将无限地进行运行,知道它被关闭

Storm工作实例

移动呼叫日志分析器

移动呼叫及其持续时间将作为对Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。在场景中,我们需要收集呼叫日志详细信息。呼叫日志的信息包含:

  • 主叫号码

  • 接收号码(即被叫号码)

  • 持续时间

由于我们没有呼叫日志的实时信息,所有将生成假呼叫日志。假信息将使用Random类创建。完整的程序代码如下:

  1. /**
  2. * 数据的输入端
  3. * @author Zhou
  4. *
  5. */
  6. public class CallLogReaderSpout implements IRichSpout{
  7. //context,collector
  8. private SpoutOutputCollector collector;
  9. private TopologyContext context;
  10. //idx
  11. private int idx = 0;
  12. //存储手机号
  13. List<String> mobiles;
  14. //创建一个随机数生成器
  15. Random random = new Random();
  16. //确定处理了特定的元组
  17. @Override
  18. public void ack(Object arg0) {
  19. // TODO Auto-generated method stub
  20. }
  21. @Override
  22. public void activate() {
  23. // TODO Auto-generated method stub
  24. }
  25. //当前的Spout要关闭的时候执行的方法
  26. @Override
  27. public void close() {
  28. // TODO Auto-generated method stub
  29. }
  30. @Override
  31. public void deactivate() {
  32. // TODO Auto-generated method stub
  33. }
  34. //指定不处理的和不重新处理的特点元组
  35. @Override
  36. public void fail(Object arg0) {
  37. // TODO Auto-generated method stub
  38. }
  39. //通过收集器发出生成的数据,换句话说就是获得下一个元组
  40. @Override
  41. public void nextTuple() {
  42. if(this.idx < 2000) {
  43. //一次next产生100个通话记录
  44. int localIdx = 0;
  45. while(localIdx++<100 && this.idx++<2000) {//先用后加
  46. //产生呼入号码
  47. String caller = mobiles.get(random.nextInt(9));
  48. //产生被叫号码
  49. String callee = mobiles.get(random.nextInt(9));
  50. while(caller.equals(callee)) {
  51. callee = mobiles.get(random.nextInt(9));
  52. }
  53. //产生通话时长
  54. int duration = random.nextInt(60);
  55. collector.emit(new Values(caller,callee,duration));
  56. }
  57. }
  58. }
  59. /**
  60. * 类似于初始化的一个方法:为Spout提供一个执行环境,执行器将运行此方法进行初始化Spout
  61. * conf:storm配置
  62. * context:提供拓扑的spout位置,任务的ID,输入输出等信息
  63. * collector:发出由bolts处理的元组。收集了spout输出的元组
  64. */
  65. @Override
  66. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  67. this.context = context;
  68. this.collector = collector;
  69. mobiles = new ArrayList<>();
  70. mobiles.add("101");
  71. mobiles.add("102");
  72. mobiles.add("103");
  73. mobiles.add("104");
  74. mobiles.add("105");
  75. mobiles.add("106");
  76. mobiles.add("107");
  77. mobiles.add("108");
  78. mobiles.add("109");
  79. }
  80. //声明数据的输出的元组模式, 其实就是定义一下字段
  81. @Override
  82. public void declareOutputFields(OutputFieldsDeclarer declear) {
  83. declear.declare(new Fields("from","to","duration"));
  84. }
  85. @Override
  86. public Map<String, Object> getComponentConfiguration() {
  87. // TODO Auto-generated method stub
  88. return null;
  89. }
  90. }
  1. /**
  2. * 创建呼叫日志:呼叫者和被叫者关联起来
  3. * @author Zhou
  4. *
  5. */
  6. public class CallLogCreatorBolt implements IRichBolt{
  7. private OutputCollector collector;
  8. //清理工作
  9. @Override
  10. public void cleanup() {
  11. // TODO Auto-generated method stub
  12. }
  13. //处理输入元组
  14. @Override
  15. public void execute(Tuple input) {
  16. //获取元组中呼叫者
  17. String from = input.getString(0);
  18. String to = input.getString(1);
  19. int duration = input.getInteger(2);
  20. //继续传递数据
  21. collector.emit(new Values(from+"---"+to,duration));
  22. }
  23. /**
  24. * 为bolt提供要执行的环境,初始化bolt
  25. */
  26. @Override
  27. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  28. this.collector = collector;
  29. }
  30. //输出字段的定义
  31. @Override
  32. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  33. declarer.declare(new Fields("callLog","duration"));
  34. }
  35. @Override
  36. public Map<String, Object> getComponentConfiguration() {
  37. // TODO Auto-generated method stub
  38. return null;
  39. }
  40. }
  1. /**
  2. * 统计通话次数
  3. * @author Zhou
  4. *
  5. */
  6. public class CallLogCounterBolt implements IRichBolt{
  7. private OutputCollector collector;
  8. private Map<String, Integer> countMap;
  9. @Override
  10. public void cleanup() {
  11. for(Entry<String, Integer> entry : countMap.entrySet()) {
  12. System.out.println(entry.getKey()+":"+entry.getValue());
  13. }
  14. }
  15. @Override
  16. public void execute(Tuple input) {
  17. String callLog = input.getString(0);
  18. int duration = input.getInteger(1);
  19. if(countMap.containsKey(callLog)) {
  20. countMap.put(callLog, countMap.get(callLog)+1);
  21. }else {
  22. countMap.put(callLog, 1);
  23. }
  24. //确认元组数据已经处理完毕
  25. collector.ack(input);
  26. }
  27. @Override
  28. public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
  29. this.collector = collector;
  30. countMap = new HashMap<>();
  31. }
  32. @Override
  33. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  34. declarer.declare(new Fields("callLog"));
  35. }
  36. @Override
  37. public Map<String, Object> getComponentConfiguration() {
  38. // TODO Auto-generated method stub
  39. return null;
  40. }
  41. }
  1. /**
  2. * 测试
  3. * @author Zhou
  4. *
  5. */
  6. public class CallLogApp {
  7. public static void main(String[] args) throws Exception {
  8. Config conf = new Config();
  9. conf.setDebug(true);
  10. //创建拓扑对象
  11. TopologyBuilder builder = new TopologyBuilder();
  12. builder.setSpout("call-log-spout", new CallLogReaderSpout());
  13. builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("call-log-spout");
  14. builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()).fieldsGrouping("call-log-creator-bolt",new Fields("callLog"));
  15. //本地执行
  16. LocalCluster cluster = new LocalCluster();
  17. cluster.submitTopology("callLogCounter", conf, builder.createTopology());
  18. Thread.sleep(10000);
  19. cluster.shutdown();
  20. /**
  21. * 集群执行
  22. * 在集群上部署topology,导出jar
  23. * 执行
  24. * [root@master ~]storm jar call_log.jar com.zhiyou100.storm.call_log.CallLogApp
  25. */
  26. //StormSubmitter.submitTopology("callLog", conf, builder.createTopology());
  27. }