storm

免费、开源、分布式、实时计算系统。 吞吐量高。 每秒每节点百万元组。

Spout //水龙头

Cloujr

JStorm

  1. storm hadoop

  1. 实时流处理 批处理
  2. 无状态 有状态
  3. 使用zk协同的主 zk的主从架构。
  4. 从架构
  5. 每秒处理数万消息 HDFS MR数分钟、数小时
  6. 不会主动停止 终有完成的时候。

storm优点

  1. 1.跨语言
  2. 2.可伸缩的
  3. 3.低延迟,秒级/分钟级
  4. 4.容错。

核心概念

  1. 1.Tuple
  2. 主要的数据结构,有序元素的列表。
  3. 2.Stream
  4. Tuple的序列。
  5. 3.Spouts
  6. 数据流源头。可以读取kafka队列消息。可以自定义。
  7. 4.Bolts
  8. 转接头.
  9. 逻辑处理单元。spout的数据传递个boltbolt计算,完成后产生新的数据。
  10. IBolt是接口。

Topology

  1. Spout + bolt连接在一起形成一个top,形成有向图,定点就是计算,边是数据流。

task

  1. Bolt中每个Spout或者bolt都是一个task.

Storm架构

  1. 1.Nimbus(灵气)
  2. master节点。
  3. 核心组件,运行top
  4. 分析top并收集运行task。分发tasksupervisor.
  5. 监控top
  6. 无状态,依靠zk监控top的运行状况。
  7. 2.Supervisor(监察)
  8. 每个supervisornworker进程,负责代理taskworker
  9. worker在孵化执行线程最终运行task
  10. storm使用内部消息系统在nimbussupervisor之间进行通信。
  11. 接受nimbus指令,管理worker进程完成task派发。
  12. 3.worker
  13. 执行特定的taskworker本身不执行任务,而是孵化executors
  14. executors执行task
  15. 4.Executor
  16. 本质上有worker进程孵化出来的一个线程而已。
  17. executor运行task都属于同一spout或者bolt.
  18. 5.task
  19. 执行实际上的任务处理。或者是Spout或者是bolt.

storm工作流程

  1. 1.nimbus等待提交的top
  2. 2.提交top后,nimbus收集task
  3. 3.nimbus分发task给所有可用的supervisor
  4. 4.supervisor周期性发送心跳给nimbus表示自己还活着。
  5. 5.如果supervisor挂掉,不会发送心跳给nimubsnimbustask发送给其他的supervisor
  6. 6.nimubs挂掉,super会继续执行自己task
  7. 7.task完成后,supervisor等待新的task
  8. 8.同时,挂掉的nimbus可以通过监控工具软件自动重启。

安装storm集群

  1. [s201 ~ s204]
  2. 1.jdk
  3. 2.tar
  4. 3.环境变量
  5. 4.验证安装
  6. $>source /etc/profile
  7. $>./storm version
  8. 5.分发安装文件到其他节点。
  9. 6.配置
  10. [storm/conf/storm.yaml]
  11. ## Nimbus 和 Supervisor 后台进程都需要一个用于存放一些状态数据(比如 jar 包、配置文件等等)的目录
  12. storm.local.dir: "/home/centos/storm"
  13. ## Storm 关联的 ZooKeeper 集群的地址列表
  14. storm.zookeeper.servers:
  15. - "s202"
  16. - "s203"
  17. storm.zookeeper.port: 2181
  18. ### nimbus.* configs are for the master
  19. ##用于配置主控节点的地址,可以配置多个。从Storm1.0开始,支持Nimbus的HA。
  20. nimbus.seeds : ["s201"]
  21. ### ui.* configs are for the master
  22. ui.host: 0.0.0.0
  23. ui.port: 8080
  24. ##配置每个 Supervisor 机器能够运行的工作进程(worker)数。
  25. 每个 worker 都需要一个单独的端口来接收消息,
  26. ##这个配置项就定义了 worker 可以使用的端口列表。
  27. 如果你在这里定义了 5 个端口,那么 Storm 就会在该机器上分配最多 5 worker
  28. ##如果定义 3 个端口,那 Storm 至多只会运行三个 worker。
  29. supervisor.slots.ports:
  30. - 6700
  31. - 6701
  32. - 6702
  33. - 6703
  34. 7.分发
  35. 8.启动进程
  36. a)启动s201 nimbus进程
  37. $>storm nimbus &
  38. b)启动s202 ~ s204 supervisor进程
  39. $>storm supervisor &
  40. c)启动s201ui进程
  41. $>storm ui &
  42. 9.通过webui查看
  43. http://s201:8080/

编程实现CallLog日志统计

  1. 0.pom.xml
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <project xmlns="http://maven.apache.org/POM/4.0.0"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  6. <modelVersion>4.0.0</modelVersion>
  7. <groupId>com.it18zhang</groupId>
  8. <artifactId>StormDemo</artifactId>
  9. <version>1.0-SNAPSHOT</version>
  10. <dependencies>
  11. <dependency>
  12. <groupId>org.apache.storm</groupId>
  13. <artifactId>storm-core</artifactId>
  14. <version>1.0.3</version>
  15. </dependency>
  16. </dependencies>
  17. </project>
  18. 1.创建Spout
  19. package com.it18zhang.stormdemo;
  20. import org.apache.storm.spout.SpoutOutputCollector;
  21. import org.apache.storm.task.TopologyContext;
  22. import org.apache.storm.topology.IRichSpout;
  23. import org.apache.storm.topology.OutputFieldsDeclarer;
  24. import org.apache.storm.tuple.Fields;
  25. import org.apache.storm.tuple.Values;
  26. import java.util.ArrayList;
  27. import java.util.List;
  28. import java.util.Map;
  29. import java.util.Random;
  30. /**
  31. * Spout类,负责产生数据流
  32. */
  33. public class CallLogSpout implements IRichSpout{
  34. //Spout输出收集器
  35. private SpoutOutputCollector collector;
  36. //是否完成
  37. private boolean completed = false;
  38. //上下文
  39. private TopologyContext context;
  40. //随机发生器
  41. private Random randomGenerator = new Random();
  42. //
  43. private Integer idx = 0;
  44. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  45. this.context = context;
  46. this.collector = collector;
  47. }
  48. public void close() {
  49. }
  50. public void activate() {
  51. }
  52. public void deactivate() {
  53. }
  54. /**
  55. * 下一个元组
  56. */
  57. public void nextTuple() {
  58. if (this.idx <= 1000) {
  59. List<String> mobileNumbers = new ArrayList<String>();
  60. mobileNumbers.add("1234123401");
  61. mobileNumbers.add("1234123402");
  62. mobileNumbers.add("1234123403");
  63. mobileNumbers.add("1234123404");
  64. Integer localIdx = 0;
  65. while (localIdx++ < 100 && this.idx++ < 1000) {
  66. //取出主叫
  67. String caller = mobileNumbers.get(randomGenerator.nextInt(4));
  68. //取出被叫
  69. String callee = mobileNumbers.get(randomGenerator.nextInt(4));
  70. while (caller == callee) {
  71. //重新取出被叫
  72. callee = mobileNumbers.get(randomGenerator.nextInt(4));
  73. }
  74. //模拟通话时长
  75. Integer duration = randomGenerator.nextInt(60);
  76. //输出元组
  77. this.collector.emit(new Values(caller, callee, duration));
  78. }
  79. }
  80. }
  81. public void ack(Object msgId) {
  82. }
  83. public void fail(Object msgId) {
  84. }
  85. /**
  86. * 定义输出的字段名称
  87. */
  88. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  89. declarer.declare(new Fields("from", "to", "duration"));
  90. }
  91. public Map<String, Object> getComponentConfiguration() {
  92. return null;
  93. }
  94. }
  95. 2.创建CreatorBolt
  96. package com.it18zhang.stormdemo;
  97. import org.apache.storm.task.OutputCollector;
  98. import org.apache.storm.task.TopologyContext;
  99. import org.apache.storm.topology.IRichBolt;
  100. import org.apache.storm.topology.OutputFieldsDeclarer;
  101. import org.apache.storm.tuple.Fields;
  102. import org.apache.storm.tuple.Tuple;
  103. import org.apache.storm.tuple.Values;
  104. import java.util.Map;
  105. /**
  106. * 创建CallLog日志的Bolt
  107. */
  108. public class CallLogCreatorBolt implements IRichBolt {
  109. //
  110. private OutputCollector collector;
  111. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  112. this.collector = collector ;
  113. }
  114. public void execute(Tuple tuple) {
  115. //处理通话记录
  116. String from = tuple.getString(0);
  117. String to = tuple.getString(1);
  118. Integer duration = tuple.getInteger(2);
  119. //产生新的tuple
  120. collector.emit(new Values(from + " - " + to, duration));
  121. }
  122. public void cleanup() {
  123. }
  124. /**
  125. * 设置输出字段的名称
  126. */
  127. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  128. declarer.declare(new Fields("call", "duration"));
  129. }
  130. public Map<String, Object> getComponentConfiguration() {
  131. return null;
  132. }
  133. }
  134. 3.创建CounterBolt
  135. package com.it18zhang.stormdemo;
  136. import org.apache.storm.task.IBolt;
  137. import org.apache.storm.task.OutputCollector;
  138. import org.apache.storm.task.TopologyContext;
  139. import org.apache.storm.topology.IRichBolt;
  140. import org.apache.storm.topology.OutputFieldsDeclarer;
  141. import org.apache.storm.tuple.Fields;
  142. import org.apache.storm.tuple.Tuple;
  143. import java.util.HashMap;
  144. import java.util.Map;
  145. /**
  146. * 通话记录计数器Bolt
  147. */
  148. public class CallLogCounterBolt implements IRichBolt{
  149. Map<String, Integer> counterMap;
  150. private OutputCollector collector;
  151. public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
  152. this.counterMap = new HashMap<String, Integer>();
  153. this.collector = collector;
  154. }
  155. public void execute(Tuple tuple) {
  156. String call = tuple.getString(0);
  157. Integer duration = tuple.getInteger(1);
  158. if (!counterMap.containsKey(call)) {
  159. counterMap.put(call, 1);
  160. } else {
  161. Integer c = counterMap.get(call) + 1;
  162. counterMap.put(call, c);
  163. }
  164. collector.ack(tuple);
  165. }
  166. public void cleanup() {
  167. for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {
  168. System.out.println(entry.getKey() + " : " + entry.getValue());
  169. }
  170. }
  171. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  172. declarer.declare(new Fields("call"));
  173. }
  174. public Map<String, Object> getComponentConfiguration() {
  175. return null;
  176. }
  177. }
  178. 4.App
  179. package com.it18zhang.stormdemo;
  180. import org.apache.storm.Config;
  181. import org.apache.storm.LocalCluster;
  182. import org.apache.storm.topology.TopologyBuilder;
  183. import org.apache.storm.tuple.Fields;
  184. /**
  185. * App
  186. */
  187. public class App {
  188. public static void main(String[] args) throws InterruptedException {
  189. TopologyBuilder builder = new TopologyBuilder();
  190. //设置Spout
  191. builder.setSpout("spout", new CallLogSpout());
  192. //设置creator-Bolt
  193. builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
  194. //设置counter-Bolt
  195. builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));
  196. Config conf = new Config();
  197. conf.setDebug(true);
  198. LocalCluster cluster = new LocalCluster();
  199. cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());
  200. Thread.sleep(10000);
  201. //停止集群
  202. cluster.shutdown();
  203. }
  204. }
  205. 5.在生产环境的集群上部署storm top
  206. a)修改提交方式
  207. [App.java]
  208. public static void main(String[] args) throws Exception {
  209. TopologyBuilder builder = new TopologyBuilder();
  210. //设置Spout
  211. builder.setSpout("spout", new CallLogSpout());
  212. //设置creator-Bolt
  213. builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout");
  214. //设置counter-Bolt
  215. builder.setBolt("counter-bolt", new CallLogCounterBolt()).fieldsGrouping("creator-bolt", new Fields("call"));
  216. Config conf = new Config();
  217. conf.setDebug(true);
  218. /**
  219. * 本地模式storm
  220. */
  221. // LocalCluster cluster = new LocalCluster();
  222. // cluster.submitTopology("LogAnalyserStorm", conf, builder.createTopology());
  223. // Thread.sleep(10000);
  224. StormSubmitter.submitTopology("mytop", conf, builder.createTopology());
  225. }
  226. b)导入jar包.
  227. maven ...
  228. c)在centos上运行top
  229. $>storm jar xxx.jar com.it18zhang.stormdemo.App

使用storm流计算实现wordcount

  1. 1.WordCountSpout
  2. ...
  3. 2.SplitBolt
  4. String line = ...
  5. String[] str = line.split(" ");
  6. for(String s : str){
  7. collector.emit(new Values("word","1"))
  8. }
  9. 3.CounterBolt

设置top的并发程度和任务

  1. 配置并发度.
  2. 1.设置worker数据
  3. conf.setNumWorkers(1);
  4. 2.设置executors个数
  5. //设置Spout的并发暗示 (executor个数)
  6. builder.setSpout("wcspout", new WordCountSpout(),3);
  7. //设置bolt的并发暗示
  8. builder.setBolt("split-bolt", new SplitBolt(),4)
  9. 3.设置task个数
  10. 每个线程可以执行多个task.
  11. builder.setSpout("wcspout", new WordCountSpout(),3).setNumTasks(2);
  12. //
  13. builder.setBolt("split-bolt", new SplitBolt(),4).shuffleGrouping("wcspout").setNumTasks(3);
  14. 4.并发度 ==== 所有的task个数的总和。