Storm构架及运行原理
https://blog.csdn.net/weiyongle1996/article/details/77142245?utm_source=gold_browser_extension
Storm简介
Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop批量处理一样,Storm可以实时处理数据。另外,Storm操作简单,可用于任意编程语言。Apache Storm采用Clojure开发。Storm有很多应用场景,包括实时数据分析、联机学习、持续计算、分布式RPC、ETL等。
Hadoop专注于批处理。这种模型对许多清醒(比如为网页建立索引)已经足够,但还存在其他一些使用模型,它们需要来自高度动态的来源的实时信息。为了解决这个问题,就得借助Nathan Marz推出的Storm(现已被Apache孵化),Storm不处理静态数据,但可处理连续的流数据。
Storm特点
编程简单:开发人员只要关注应用逻辑,而且跟Hadoop类似,Storm提供的编程原语也很简单
高性能,低延迟:可以应用于广告搜索引擎这种要求对广告主的操作进行实时响应的场景
分布式:可以轻松应对数据量大,单机搞不定的场景
可扩展:随着业务发展,数据量和计算量越来越大,系统可水平扩展
容错:单节点挂了不影响应用
消息不丢失:保证消息处理
Storm和Hadoop的比较
Storm用于实时计算,Hadoop用于离线计算
Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批
Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中
Storm与Hadoop的编程模型相似
| 结构 | Hadoop | Storm |
|---|---|---|
| 主节点 | JobTracker | Nimbus |
| 从节点 | TaskTRacker | Supervisor |
| 应用程序 | Job | Topology |
| 工作进程名称 | Child | Worker |
| 计算模型 | Map/Reduce | Spout/Bolt |
Storm工作流程
一个工作的Storm集群应该有一个Nimbus和一个或多个supervisor。另一个重要的节点是Apache Zookeeper,它将用于Nimbus和Supervisor之间的协调。
最初,Nimbus将等待“Storm拓扑”提交得它
一旦提交拓扑,它将处理拓扑并收集要执行的所有任务和任务将被执行的顺序
然后,Nimbus将任务均匀分配给所有可用的supervisor
在特定的时间间隔,所有supervisor将向Nimbus发送心跳以通知它们仍然活着
当supervisor终止并且不发送心跳时,则Nimbus将任务分配给另一个supervisor
一旦所有的任务都完成后,supervisor将等待新的任务进去
同时,终止的Nimbus将由服务监控工具自动重新启动
重新启动的网络将从停止的地方继续。同样,终止Supervisor也可以自动重新启动。由于网络管理程序和Supervisor都可以自动重新启动,并两者都像以前一样继续,因此Storm保证至少处理所有任务一次
一旦处理了所有拓扑,则网络管理器等待新的拓扑到达,并且类似的,管理器等待新的任务
默认情况下,Storm集群有两种模式
- 本地模式
此模式用于开发,测试和调试,因为它是查看所有拓扑组件协调工作的最简单方法。在这种模式下,我们可以调整参数,使我们能够看到我们的拓扑如何在不同的Storm配置环境中运行。在本地模式下,Storm拓扑在本地机器上在单个JVM中运行
- 生产模式
在这种模式下,我们将拓扑提交到工作Storm集群,该集群有许多进程组成,通常运行在不同的机器上。如在Storm的工作流中所讨论的,工作集群将无限地进行运行,知道它被关闭
Storm工作实例
移动呼叫日志分析器
移动呼叫及其持续时间将作为对Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。在场景中,我们需要收集呼叫日志详细信息。呼叫日志的信息包含:
主叫号码
接收号码(即被叫号码)
持续时间
由于我们没有呼叫日志的实时信息,所有将生成假呼叫日志。假信息将使用Random类创建。完整的程序代码如下:
/*** 数据的输入端* @author Zhou**/public class CallLogReaderSpout implements IRichSpout{//context,collectorprivate SpoutOutputCollector collector;private TopologyContext context;//idxprivate int idx = 0;//存储手机号List<String> mobiles;//创建一个随机数生成器Random random = new Random();//确定处理了特定的元组@Overridepublic void ack(Object arg0) {// TODO Auto-generated method stub}@Overridepublic void activate() {// TODO Auto-generated method stub}//当前的Spout要关闭的时候执行的方法@Overridepublic void close() {// TODO Auto-generated method stub}@Overridepublic void deactivate() {// TODO Auto-generated method stub}//指定不处理的和不重新处理的特点元组@Overridepublic void fail(Object arg0) {// TODO Auto-generated method stub}//通过收集器发出生成的数据,换句话说就是获得下一个元组@Overridepublic void nextTuple() {if(this.idx < 2000) {//一次next产生100个通话记录int localIdx = 0;while(localIdx++<100 && this.idx++<2000) {//先用后加//产生呼入号码String caller = mobiles.get(random.nextInt(9));//产生被叫号码String callee = mobiles.get(random.nextInt(9));while(caller.equals(callee)) {callee = mobiles.get(random.nextInt(9));}//产生通话时长int duration = random.nextInt(60);collector.emit(new Values(caller,callee,duration));}}}/*** 类似于初始化的一个方法:为Spout提供一个执行环境,执行器将运行此方法进行初始化Spout* conf:storm配置* context:提供拓扑的spout位置,任务的ID,输入输出等信息* collector:发出由bolts处理的元组。收集了spout输出的元组*/@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {this.context = context;this.collector = collector;mobiles = new ArrayList<>();mobiles.add("101");mobiles.add("102");mobiles.add("103");mobiles.add("104");mobiles.add("105");mobiles.add("106");mobiles.add("107");mobiles.add("108");mobiles.add("109");}//声明数据的输出的元组模式, 其实就是定义一下字段@Overridepublic void declareOutputFields(OutputFieldsDeclarer declear) {declear.declare(new Fields("from","to","duration"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}
/*** 创建呼叫日志:呼叫者和被叫者关联起来* @author Zhou**/public class CallLogCreatorBolt implements IRichBolt{private OutputCollector collector;//清理工作@Overridepublic void cleanup() {// TODO Auto-generated method stub}//处理输入元组@Overridepublic void execute(Tuple input) {//获取元组中呼叫者String from = input.getString(0);String to = input.getString(1);int duration = input.getInteger(2);//继续传递数据collector.emit(new Values(from+"---"+to,duration));}/*** 为bolt提供要执行的环境,初始化bolt*/@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;}//输出字段的定义@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("callLog","duration"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}
/*** 统计通话次数* @author Zhou**/public class CallLogCounterBolt implements IRichBolt{private OutputCollector collector;private Map<String, Integer> countMap;@Overridepublic void cleanup() {for(Entry<String, Integer> entry : countMap.entrySet()) {System.out.println(entry.getKey()+":"+entry.getValue());}}@Overridepublic void execute(Tuple input) {String callLog = input.getString(0);int duration = input.getInteger(1);if(countMap.containsKey(callLog)) {countMap.put(callLog, countMap.get(callLog)+1);}else {countMap.put(callLog, 1);}//确认元组数据已经处理完毕collector.ack(input);}@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {this.collector = collector;countMap = new HashMap<>();}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("callLog"));}@Overridepublic Map<String, Object> getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}}
/*** 测试* @author Zhou**/public class CallLogApp {public static void main(String[] args) throws Exception {Config conf = new Config();conf.setDebug(true);//创建拓扑对象TopologyBuilder builder = new TopologyBuilder();builder.setSpout("call-log-spout", new CallLogReaderSpout());builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("call-log-spout");builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()).fieldsGrouping("call-log-creator-bolt",new Fields("callLog"));//本地执行LocalCluster cluster = new LocalCluster();cluster.submitTopology("callLogCounter", conf, builder.createTopology());Thread.sleep(10000);cluster.shutdown();/*** 集群执行* 在集群上部署topology,导出jar* 执行* [root@master ~]storm jar call_log.jar com.zhiyou100.storm.call_log.CallLogApp*///StormSubmitter.submitTopology("callLog", conf, builder.createTopology());}
