1. 离线计算是什么?

  2. 流式计算是什么?

  3. 流式计算与离线计算的区别?

  4. Storm是什么?

  5. Storm与Hadoop的区别?

  6. Storm的应用场景及行业案例

  7. Storm的核心组件(重点掌握)

  8. Storm的编程模型(重点掌握)

  9. 流式计算的一般架构图(重点掌握)

1、离线计算是什么?

  1. 离线计算:批量(batch)获取数据、批量传输数据、**周期性**批量计算数据、数据展示<br /> 代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、***任务调度<br />1hivesql<br />2、调度平台<br />3Hadoop集群运维<br />4、数据清洗(脚本语言)ETL<br />5、元数据管理<br />6、数据稽查 <br />7、数据仓库模型架构

2、流式计算是什么

流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示<br />    代表技术:Flume实时获取数据、Kafka/**metaq**实时数据存储、**Storm/JStorm**实时数据计算、Redis实时**结果**缓存、持久化存储(mysql)。<br />    一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果

3、实时计算概述

有别于传统的离线批处理操作(对很多数据的集合进行的操作),实时处理,说白就是针对一条一条的数据/记录进行操作,所有的这些操作进行一个汇总(截止到目前为止的所有的统计总和)。

4、离线计算与实时计算的区别

最大的区别:实时收集、实时计算、实时展示<br />Bounded:有界    离线计算面临的操作数据都是有界限的,无论是1G、1T、1P、1EB、1NB    数据的有界必然会导致计算的有界<br />UnBounded:_***_    实时计算面临的操作数据是源源不断的向水流一样,是没有界限的,    数据的***必然导致计算的***

5、Storm是什么?

Flume实时采集,低延迟<br />    Kafka消息队列,低延迟<br />    Storm实时计算,低延迟<br />    Redis实时存储,低延迟<br />ApacheStorm是Twitter开源的一个类似于Hadoop的实时数据处理框架,它原来是由BackType开发,后BackType被Twitter收购,将Storm作为Twitter的实时数据分析系统。<br />Storm能实现高频数据和大规模数据的实时处理。用来**实时处理数据**,特点:低延迟、高可用、分布式、可扩展、**数据不丢失**。提供简单容易理解的接口,便于开发。海量数据?数据类型很多,产生数据的终端很多,处理数据能力增强

5.1、Storm的设计思想

Storm是对流Stream的抽象,流是一个不间断的*的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组。
Storm将流中元素抽象为Tuple,一个tuple就是一个值列表——valuelist,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。
Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout。
有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢。将流的状态转换称为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。
Storm流式计算 - 图2

以上处理过程统称为Topology即拓扑。拓扑是storm中最高层次的一个抽象概念,它可以被提交到storm集群执行,一个拓扑就是一个流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。
拓扑的每个节点都要说明它所发出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。
Storm流式计算 - 图3

6、Storm与Hadoop的区别

  • Storm用于实时计算,Hadoop用于离线计算。

  • Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批。

  • Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中。

  • Storm与Hadoop的编程模型相似

Storm流式计算 - 图4
Job:任务名称
JobTracker:项目经理
TaskTracker:开发组长、产品经理
Child:负责开发的人员
Mapper/Reduce:开发人员中的两种角色,一种是服务器开发、一种是客户端开发

Topology:任务名称
Nimbus:项目经理
Supervisor:开组长、产品经理
Worker:开人员
Spout/Bolt:开人员中的两种角色,一种是服务器开发、一种是客户端开发

Storm和Hadoop比较

数据来源

HADOOP处理的是HDFS上TB级别的数据(历史数据),STORM是处理的是实时新增的某一笔数据(实时数据);

处理过程

HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT);

是否结束

HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始;

处理速度

HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢,STORM是只要处理新增的某一笔数据即可,可以做到很快;

适用场景

HADOOP是在要处理批量数据时用的,不讲究时效性,STORM是要处理某一新增数据时用的,要讲时效性。

7、Storm应用场景及行业案例

Storm用来实时计算源源不断产生的数据,如同流水线生产。

7.1、运用场景

  • 日志分析

  • 海量日志中分析出特定的数据,并将分析的结果存入外部存储器用来辅佐决策。

  • 管道系统

  • 将一个数据从一个系统传输到另外一个系统,比如将数据库同步到Hadoop

  • 消息转化器

  • 将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件

7.2、典型案列

  • 一淘-实时分析系统:实时分析用户的属性,并反馈给搜索引擎

  • 最初,用户属性分析是通过每天在云梯上定时运行的MR job来完成的。为了满足实时性的要求,希望能够实时分析用户的行为日志,将最新的用户属性反馈给搜索引擎,能够为用户展现最贴近其当前需求的结果。

  • 携程-网站性能监控:实时分析系统监控携程网的网站性能

  • 利用HTML5提供的performance标准获得可用的指标,并记录日志。Storm集群实时分析日志和入库。使用DRPC聚合成报表,通过历史数据对比等判断规则,触发预警事件。

  • 阿里妈妈-用户画像:实时计算用户的兴趣数据

  • 为了更加精准投放广告,阿里妈妈后台计算引擎需要维护每个用户的兴趣点(理想状态是,你对什么感兴趣,就向你投放哪类广告)。用户兴趣主要基于用户的历史行为、用户的实时查询、用户的实时点击、用户的地理信息而得,其中实时查询、实时点击等用户行为都是实时数据。考虑到系统的实时性,阿里妈妈使用Storm维护用户兴趣数据,并在此基础上进行受众定向的广告投放。

![](https://cdn.nlark.com/yuque/0/2018/png/173322/1545224198502-3e84609b-e02f-43fe-a858-4325dd42f5a9.png#width=747)

8、Storm核心组件(重要)

Storm流式计算 - 图5

  • Nimbus:负责资源分配和任务调度。

  • Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。—-通过配置文件设置当前supervisor上启动多少个worker。

  • Worker:运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。

  • Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,不同spout/bolt的task可能会共享一个物理线程,该线程称为executor。

9、Storm编程模型(重要)

Storm流式计算 - 图6

  • Topology:Storm中运行的一个实时应用程序的名称。(拓扑)

  • Spout:在一个topology中获取源数据流的组件。

  • 通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。

  • Bolt:接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。

  • Tuple:一次消息传递的基本单元,理解为一组消息就是一个Tuple。

  • Stream:表示数据的流向。

  • Spout:喷头、发射

Storm架构及编程模型的分析

Storm流式计算 - 图7

Topology分析及运行过程

Storm流式计算 - 图8
Storm流式计算 - 图9
一个task是最终完成数据处理的实体单元
executor会调用task方法(spout/bolt)的方法(nextTuple/execute),executor会执行组件(spout/bolt)1个或多个task实例

10、流式计算一般架构图(重要)

(网络系统—Flume集群—-kafka集群)通过网络IO进行数据传输
Storm流式计算 - 图10

  • 其中flume用来获取数据。

  • Kafka用来临时保存数据。

  • Strom用来计算数据。

  • Redis是个内存数据库,用来保存数据。

11、Storm配置文件

Storm流式计算 - 图1111.1、分发安装包

scp -r /root/apps/apache-storm-0.9.5 Mini1:/root/apps
然后分别在各机器上创建软连接
cd /apps
ln -s apache-storm-0.9.5 storm

11.2、启动集群

1、在nimbus.host所属的机器上启动 nimbus服务
cd /root/apps/storm/bin/
./storm nimbus &
2、在nimbus.host所属的机器上启动ui服务
cd /root/apps/storm/bin/
./storm ui &
3、在其它个点击上启动supervisor服务
cd /root/apps/storm/bin/
./storm supervisor &

12、Storm常用操作命令

有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑。

  • 提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】

bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount

  • 杀死任务命令格式:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)

storm kill topology-name -w 10

  • 停用任务命令格式:storm deactivte 【拓扑名称】

storm deactivte topology-name
我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。

  • 启用任务命令格式:storm activate【拓扑名称】
  storm activate topology-name
  • 重新部署任务命令格式:storm rebalance 【拓扑名称】
    storm rebalance topology-name<br />再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑。

13、Storm集群的进程及日志熟悉

先进入Storm的日志文件logs中
cd /root/apps/apache-storm-0.10.0/logs

13.1查看nimbus的日志信息

tail -100f /root/apps/apache-storm-0.10.0/logs/nimbus.log

13.2查看ui运行日志信息

在ui的服务器上,一般和nimbus一个服务器
tail -100f /root/apps/apache-storm-0.10.0/logs/ui.log

13.3查看supervisor运行日志信息

在supervisor服务上
tail -100f /root/apps/storm/logs/supervisor.log

13.4查看supervisor上worker运行日志信息(supervisor节点)

tail -100f /root/apps/storm/logs/wordcount-1-1545167242-worker-6702.log

14、分组策略Stream Grouping详解

Storm里面有7种类型的stream grouping

  • Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

  • Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

  • All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

  • Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

  • Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。

  • Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task id(OutputCollector.emit方法也会返回task的id)。

  • Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

15、Storm单词实例(提交Topology到集群)

Storm流式计算 - 图12
MySpout:数据源,在已知的英文句子中,随机发送一条句子出去

package com.zhiyoulxj.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

public class MySpout extends BaseRichSpout {
/**
 * spout用来获取数据,发射给bolt
 */
    SpoutOutputCollector collector;
    //Storm框架调用这个open 方法初始化collector
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
        this.collector=collector;
    }
    //相当于while(true) 发射流数据,最小单元是tuple
    @Override
    public void nextTuple() {
        collector.emit(new Values("luban qiang long gong lve"));
    }
    //声明输出类型给bolt
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("hello")); //"hello"代表一个单词
    }
}

MySplitBolt:负责将单行文本记录(句子)切分成单词

package com.zhiyoulxj.storm;
/**
 *  业务处理用来切割单词
 */
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;
public class MySplitBolt extends BaseRichBolt {
    OutputCollector collector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        this.collector=collector;
    }
    /**
     *  核心业务处理方法,处理单词切割并且发射给下一个bolt
     *  while(true)
     */
    @Override
    public void execute(Tuple tuple) {
       String line = tuple.getString(0);
       String[] words = line.split(" ");
       for(String word:words){
           collector.emit(new Values(word,1));
       }
    }
    /**
     * 声明输出类型
     * @param outputFieldsDeclarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word","number"));
    }
}

MyCountBolt:l 负责对单词的频率进行累加

package com.zhiyoulxj.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class MyCountBolt extends BaseRichBolt {
    OutputCollector collector;
    Map<String,Integer> count = new HashMap<String,Integer>();
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        this.collector=collector;
    }
    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer num = tuple.getInteger(1);
        if (count.containsKey(word)){
            Integer val = count.get(word);
            count.put(word,val+num);
        }else{
            count.put(word,num);
        }
        System.out.println("count"+count);

    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //没有输出就不写了
    }
}

MyTopology驱动类

  1. 本地模式

  2. 集群模式

package com.zhiyoulxj.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
 *  相当于MapReduce的驱动类
 */
public class MyTopology {
    public static void main(String[] args)throws  Exception{
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout",new MySpout(),2);
        topologyBuilder.setBolt("split",new MySplitBolt(),2).shuffleGrouping("spout");
        topologyBuilder.setBolt("count",new MyCountBolt(),4).fieldsGrouping("split",new Fields("word"));
//        topologyBuilder.setBolt("count",new MyCountBolt(),4).shuffleGrouping("split");
        Config cfg = new Config();
        cfg.setNumWorkers(3);
        //本地模式
        //LocalCluster localCluster = new LocalCluster();
        //localCluster.submitTopology("word-count",cfg,topologyBuilder.createTopology());
        //集群模式
        StormSubmitter.submitTopology("word-count",cfg,topologyBuilder.createTopology());
    }
}

2、打包

1.在Idea中打包,选中maven——>package
Storm流式计算 - 图13
2.上传至linux中,执行
3.进入storm的bin目录
cd apps/storm/bin
./storm jar /Storm-0.0.1-SNAPSHOT.jar com.zhiyoulxj.storm.MyTopology wordcount1
Storm流式计算 - 图14
4.在cdh1:8080/index.xml上查看

16、Storm的总结

  1. 拓扑(Topology):打包好的实时应用计算任务,同hadoop的MapReduce任务
    2. 元组(Tuple):是stream中的最小传输单元,可以用来包装需要处理的业务
    3. 流(Stream):数据流是对storm中所传输的数据的一种抽象,它在时间上是由无边无界的tuple组成。
    4. Spout(“喷嘴”):获取数据源数据,Storm中的流的来源,从外部获取。比如从kafka读取元组数据到拓扑。
    5. Bolts:在拓扑中实现业务计算逻辑,每个bolt中的任务并发度对应一个线程。
    6. Tasks(任务):每个bolt和spout会以多个任务(task)的形式运行在集群中,一个task本质上是一个线程,默认情况下:executor=thread;。
    7. 流分组(StreamGrouping):定义了一个流在一个处理它的bolt内的多个任务(task)之间如何进行分组。
    8. 可靠性:Storm保证拓扑中的spout产生的tuple会被完整处理。
    9. Worker(工作进程):拓扑是以一个或多个worker进程方式运行,每个worker可以看做是一个物理虚拟机,执行拓扑的一部分,worker可能同时存在多个spout和bolt类型任务。
    10. Executor:是被一个worker进程单独启动的线程,每个executor只会运行bolttask或spottask
    11. Nimbus:Storm集群的master节点,负责分发用户写的代码,指派具体的supervisor节点上的worker节点,去运行Topology对应的spout或bolt的task。
    12. Supervisor:Storm集群的从节点,负责管理运行在Supervisor节点上的每一个Worker进程的启动和停止,接受nimbus分配的任务,启动自己的work(work的数量和端口数量一致)。

    17、Storm消息容错机制

    Storm流式计算 - 图15
    1. 集群节点宕机(集群角度)
    (1)Nimbus服务器
    单点故障问题 HA(高可用)
    (2)非Nimbus服务器
    故障时,该节点上的task任务会被nimbus重新分配到其他节点上
    2. 进程去世
    (1)Worker挂掉,supervisor会重新启动这个进程。假如启动的过程一直失败,并且无法向nimbus发送心跳,Nimbus把这个worker进程重新分配到其他机器上。
    (2)Supervisor
    Supervisor的状态在zookeeper中保存,假日进程死掉就会立即失败(遇到异常就自动毁灭)
    (3)Nimbus
    Nimbus的状态也在zookeeper中保存,进程死掉,自动毁灭
    3. 消息的完整性:通过Acker—-消息完整性的实现机制:保证消息肯定被处理一次,但是不保证会不会重发。假如消息发送的过程中,传递失败。这一组消息就得重发。Spout在发送消息的时候,同时带上一个messageId,这样消息(tuple)发送失败后,spout就知道哪一个tuple发送失败。
    Acker默认为每一个spout,bolt分别启动一个线程。