离线计算是什么?
流式计算是什么?
流式计算与离线计算的区别?
Storm是什么?
Storm与Hadoop的区别?
Storm的应用场景及行业案例
Storm的核心组件(重点掌握)
Storm的编程模型(重点掌握)
流式计算的一般架构图(重点掌握)
1、离线计算是什么?
离线计算:批量(batch)获取数据、批量传输数据、**周期性**批量计算数据、数据展示<br /> 代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、***任务调度<br />1,hivesql<br />2、调度平台<br />3、Hadoop集群运维<br />4、数据清洗(脚本语言)ETL<br />5、元数据管理<br />6、数据稽查 <br />7、数据仓库模型架构
2、流式计算是什么
流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示<br /> 代表技术:Flume实时获取数据、Kafka/**metaq**实时数据存储、**Storm/JStorm**实时数据计算、Redis实时**结果**缓存、持久化存储(mysql)。<br /> 一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果
3、实时计算概述
有别于传统的离线批处理操作(对很多数据的集合进行的操作),实时处理,说白就是针对一条一条的数据/记录进行操作,所有的这些操作进行一个汇总(截止到目前为止的所有的统计总和)。
4、离线计算与实时计算的区别
最大的区别:实时收集、实时计算、实时展示<br />Bounded:有界 离线计算面临的操作数据都是有界限的,无论是1G、1T、1P、1EB、1NB 数据的有界必然会导致计算的有界<br />UnBounded:_***_ 实时计算面临的操作数据是源源不断的向水流一样,是没有界限的, 数据的***必然导致计算的***
5、Storm是什么?
Flume实时采集,低延迟<br /> Kafka消息队列,低延迟<br /> Storm实时计算,低延迟<br /> Redis实时存储,低延迟<br />ApacheStorm是Twitter开源的一个类似于Hadoop的实时数据处理框架,它原来是由BackType开发,后BackType被Twitter收购,将Storm作为Twitter的实时数据分析系统。<br />Storm能实现高频数据和大规模数据的实时处理。用来**实时处理数据**,特点:低延迟、高可用、分布式、可扩展、**数据不丢失**。提供简单容易理解的接口,便于开发。海量数据?数据类型很多,产生数据的终端很多,处理数据能力增强
5.1、Storm的设计思想
Storm是对流Stream的抽象,流是一个不间断的*的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组。
Storm将流中元素抽象为Tuple,一个tuple就是一个值列表——valuelist,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。
Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout。
有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢。将流的状态转换称为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。
以上处理过程统称为Topology即拓扑。拓扑是storm中最高层次的一个抽象概念,它可以被提交到storm集群执行,一个拓扑就是一个流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。
拓扑的每个节点都要说明它所发出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。
6、Storm与Hadoop的区别
Storm用于实时计算,Hadoop用于离线计算。
Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批。
Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中。
Storm与Hadoop的编程模型相似

Job:任务名称
JobTracker:项目经理
TaskTracker:开发组长、产品经理
Child:负责开发的人员
Mapper/Reduce:开发人员中的两种角色,一种是服务器开发、一种是客户端开发
Topology:任务名称
Nimbus:项目经理
Supervisor:开组长、产品经理
Worker:开人员
Spout/Bolt:开人员中的两种角色,一种是服务器开发、一种是客户端开发
Storm和Hadoop比较
数据来源
HADOOP处理的是HDFS上TB级别的数据(历史数据),STORM是处理的是实时新增的某一笔数据(实时数据);
处理过程
HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT);
是否结束
HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始;
处理速度
HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢,STORM是只要处理新增的某一笔数据即可,可以做到很快;
适用场景
HADOOP是在要处理批量数据时用的,不讲究时效性,STORM是要处理某一新增数据时用的,要讲时效性。
7、Storm应用场景及行业案例
7.1、运用场景
日志分析
从海量日志中分析出特定的数据,并将分析的结果存入外部存储器用来辅佐决策。
管道系统
将一个数据从一个系统传输到另外一个系统,比如将数据库同步到Hadoop
消息转化器
将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件
7.2、典型案列
一淘-实时分析系统:实时分析用户的属性,并反馈给搜索引擎
最初,用户属性分析是通过每天在云梯上定时运行的MR job来完成的。为了满足实时性的要求,希望能够实时分析用户的行为日志,将最新的用户属性反馈给搜索引擎,能够为用户展现最贴近其当前需求的结果。
携程-网站性能监控:实时分析系统监控携程网的网站性能
利用HTML5提供的performance标准获得可用的指标,并记录日志。Storm集群实时分析日志和入库。使用DRPC聚合成报表,通过历史数据对比等判断规则,触发预警事件。
阿里妈妈-用户画像:实时计算用户的兴趣数据
为了更加精准投放广告,阿里妈妈后台计算引擎需要维护每个用户的兴趣点(理想状态是,你对什么感兴趣,就向你投放哪类广告)。用户兴趣主要基于用户的历史行为、用户的实时查询、用户的实时点击、用户的地理信息而得,其中实时查询、实时点击等用户行为都是实时数据。考虑到系统的实时性,阿里妈妈使用Storm维护用户兴趣数据,并在此基础上进行受众定向的广告投放。

8、Storm核心组件(重要)

Nimbus:负责资源分配和任务调度。
Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。—-通过配置文件设置当前supervisor上启动多少个worker。
Worker:运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。
Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,不同spout/bolt的task可能会共享一个物理线程,该线程称为executor。
9、Storm编程模型(重要)

Topology:Storm中运行的一个实时应用程序的名称。(拓扑)
Spout:在一个topology中获取源数据流的组件。
通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。
Bolt:接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。
Tuple:一次消息传递的基本单元,理解为一组消息就是一个Tuple。
Stream:表示数据的流向。
Spout:喷头、发射
Storm架构及编程模型的分析
Topology分析及运行过程


一个task是最终完成数据处理的实体单元
executor会调用task方法(spout/bolt)的方法(nextTuple/execute),executor会执行组件(spout/bolt)1个或多个task实例
10、流式计算一般架构图(重要)
(网络系统—Flume集群—-kafka集群)通过网络IO进行数据传输
其中flume用来获取数据。
Kafka用来临时保存数据。
Strom用来计算数据。
Redis是个内存数据库,用来保存数据。
11、Storm配置文件
11.1、分发安装包
scp -r /root/apps/apache-storm-0.9.5 Mini1:/root/apps
然后分别在各机器上创建软连接
cd /apps
ln -s apache-storm-0.9.5 storm
11.2、启动集群
1、在nimbus.host所属的机器上启动 nimbus服务
cd /root/apps/storm/bin/
./storm nimbus &
2、在nimbus.host所属的机器上启动ui服务
cd /root/apps/storm/bin/
./storm ui &
3、在其它个点击上启动supervisor服务
cd /root/apps/storm/bin/
./storm supervisor &
12、Storm常用操作命令
有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑。
- 提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount
- 杀死任务命令格式:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
storm kill topology-name -w 10
- 停用任务命令格式:storm deactivte 【拓扑名称】
storm deactivte topology-name
我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。
- 启用任务命令格式:storm activate【拓扑名称】
storm activate topology-name
- 重新部署任务命令格式:storm rebalance 【拓扑名称】
storm rebalance topology-name<br />再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑。
13、Storm集群的进程及日志熟悉
先进入Storm的日志文件logs中
cd /root/apps/apache-storm-0.10.0/logs
13.1查看nimbus的日志信息
tail -100f /root/apps/apache-storm-0.10.0/logs/nimbus.log
13.2查看ui运行日志信息
在ui的服务器上,一般和nimbus一个服务器
tail -100f /root/apps/apache-storm-0.10.0/logs/ui.log
13.3查看supervisor运行日志信息
在supervisor服务上
tail -100f /root/apps/storm/logs/supervisor.log
13.4查看supervisor上worker运行日志信息(supervisor节点)
tail -100f /root/apps/storm/logs/wordcount-1-1545167242-worker-6702.log
14、分组策略Stream Grouping详解
Storm里面有7种类型的stream grouping
Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task id(OutputCollector.emit方法也会返回task的id)。
Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。
15、Storm单词实例(提交Topology到集群)

MySpout:数据源,在已知的英文句子中,随机发送一条句子出去
package com.zhiyoulxj.storm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class MySpout extends BaseRichSpout {
/**
* spout用来获取数据,发射给bolt
*/
SpoutOutputCollector collector;
//Storm框架调用这个open 方法初始化collector
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
this.collector=collector;
}
//相当于while(true) 发射流数据,最小单元是tuple
@Override
public void nextTuple() {
collector.emit(new Values("luban qiang long gong lve"));
}
//声明输出类型给bolt
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("hello")); //"hello"代表一个单词
}
}
MySplitBolt:负责将单行文本记录(句子)切分成单词
package com.zhiyoulxj.storm;
/**
* 业务处理用来切割单词
*/
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class MySplitBolt extends BaseRichBolt {
OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector=collector;
}
/**
* 核心业务处理方法,处理单词切割并且发射给下一个bolt
* while(true)
*/
@Override
public void execute(Tuple tuple) {
String line = tuple.getString(0);
String[] words = line.split(" ");
for(String word:words){
collector.emit(new Values(word,1));
}
}
/**
* 声明输出类型
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","number"));
}
}
MyCountBolt:l 负责对单词的频率进行累加
package com.zhiyoulxj.storm;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class MyCountBolt extends BaseRichBolt {
OutputCollector collector;
Map<String,Integer> count = new HashMap<String,Integer>();
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getString(0);
Integer num = tuple.getInteger(1);
if (count.containsKey(word)){
Integer val = count.get(word);
count.put(word,val+num);
}else{
count.put(word,num);
}
System.out.println("count"+count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//没有输出就不写了
}
}
MyTopology驱动类
本地模式
集群模式
package com.zhiyoulxj.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* 相当于MapReduce的驱动类
*/
public class MyTopology {
public static void main(String[] args)throws Exception{
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout",new MySpout(),2);
topologyBuilder.setBolt("split",new MySplitBolt(),2).shuffleGrouping("spout");
topologyBuilder.setBolt("count",new MyCountBolt(),4).fieldsGrouping("split",new Fields("word"));
// topologyBuilder.setBolt("count",new MyCountBolt(),4).shuffleGrouping("split");
Config cfg = new Config();
cfg.setNumWorkers(3);
//本地模式
//LocalCluster localCluster = new LocalCluster();
//localCluster.submitTopology("word-count",cfg,topologyBuilder.createTopology());
//集群模式
StormSubmitter.submitTopology("word-count",cfg,topologyBuilder.createTopology());
}
}
2、打包
1.在Idea中打包,选中maven——>package
2.上传至linux中,执行
3.进入storm的bin目录
cd apps/storm/bin
./storm jar /Storm-0.0.1-SNAPSHOT.jar com.zhiyoulxj.storm.MyTopology wordcount1 
4.在cdh1:8080/index.xml上查看
16、Storm的总结
- 拓扑(Topology):打包好的实时应用计算任务,同hadoop的MapReduce任务
2. 元组(Tuple):是stream中的最小传输单元,可以用来包装需要处理的业务
3. 流(Stream):数据流是对storm中所传输的数据的一种抽象,它在时间上是由无边无界的tuple组成。
4. Spout(“喷嘴”):获取数据源数据,Storm中的流的来源,从外部获取。比如从kafka读取元组数据到拓扑。
5. Bolts:在拓扑中实现业务计算逻辑,每个bolt中的任务并发度对应一个线程。
6. Tasks(任务):每个bolt和spout会以多个任务(task)的形式运行在集群中,一个task本质上是一个线程,默认情况下:executor=thread;。
7. 流分组(StreamGrouping):定义了一个流在一个处理它的bolt内的多个任务(task)之间如何进行分组。
8. 可靠性:Storm保证拓扑中的spout产生的tuple会被完整处理。
9. Worker(工作进程):拓扑是以一个或多个worker进程方式运行,每个worker可以看做是一个物理虚拟机,执行拓扑的一部分,worker可能同时存在多个spout和bolt类型任务。
10. Executor:是被一个worker进程单独启动的线程,每个executor只会运行bolttask或spottask
11. Nimbus:Storm集群的master节点,负责分发用户写的代码,指派具体的supervisor节点上的worker节点,去运行Topology对应的spout或bolt的task。
12. Supervisor:Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和停止,接受nimbus分配的任务,启动自己的work(work的数量和端口数量一致)。17、Storm消息容错机制

1. 集群节点宕机(集群角度)
(1)Nimbus服务器
单点故障问题 HA(高可用)
(2)非Nimbus服务器
故障时,该节点上的task任务会被nimbus重新分配到其他节点上
2. 进程去世
(1)Worker挂掉,supervisor会重新启动这个进程。假如启动的过程一直失败,并且无法向nimbus发送心跳,Nimbus把这个worker进程重新分配到其他机器上。
(2)Supervisor
Supervisor的状态在zookeeper中保存,假日进程死掉就会立即失败(遇到异常就自动毁灭)
(3)Nimbus
Nimbus的状态也在zookeeper中保存,进程死掉,自动毁灭
3. 消息的完整性:通过Acker—-消息完整性的实现机制:保证消息肯定被处理一次,但是不保证会不会重发。假如消息发送的过程中,传递失败。这一组消息就得重发。Spout在发送消息的时候,同时带上一个messageId,这样消息(tuple)发送失败后,spout就知道哪一个tuple发送失败。
Acker默认为每一个spout,bolt分别启动一个线程。
