Linux/shell

常用查询进程pid stop.sh

  1. #!/bin/bash
  2. appName=java -cp xxx.jar com.xxx...
  3. pid = $(ps -ef|grep ${appName}|grep java | grep -v grep|awk '{print $2}')
  4. echo $pid
  5. #-z 字符串 字符串的长度为零 则为真
  6. #-n 字符串 字符串的长度不为零 则为真
  7. if[ -z $pid ];then
  8. echo '无进程 停止失败'
  9. else
  10. kill -9 $pid
  11. echo 'kill ok'
  12. fi
  13. # 常用后台启动
  14. nohup java -cp xxx.jar com... >/dev/null 2>&1 &

shell中引号区别

单引不取,双引取值,外面为准,反引执行
单引常量,双引变量,外面为准,反引执行
例如:

  1. #!/bin/bash
  2. do_date=$1
  3. echo '$do_date'
  4. echo "$do_date"
  5. echo "'$do_date'"
  6. echo '"$do_date"'
  7. echo `date`
  8. ----------------------结果
  9. $do_date
  10. 2019-02-10
  11. '2019-02-10'
  12. "$do_date"
  13. 2019 05 02 星期四 21:02:08 CST

Hadoop

HDFS读流程

image.png
1 客户端发送读取文件路径给NameNode,请求下载文件;
2 NameNode获取文件的元数据(主要是block存放位置)返回给客户端
3 客户端根据元数据找对应DataNode逐个获取文件block,并在客户端本地进行数据的追加合并,从而获得整个文件。

HDFS写流程

image.png
1 客户端向NameNode请求上传文件。
2 NameNode会检查文件、父目录是否存在,然后给出响应可以上传文件。
3 客户端会先对文件进行切分。比如一个block块是128m,一个300m的文件就会分成3个Block,两个128M,一个44M。切分之后,然后向NameNode请求,第一个Block块应该上传到那些DataNode服务器。
4 NameNode返回存放数据的DataNode节点。
5 客户端请求一台DataNode上传Blokc(RPC调用,建立pipeline),第一个DataNode接收请求后会继续调用第二个DataNode,然后第二个调用第三个DataNode,将整个pipline建立完成,逐级应答返回客户端。
7 客户端开始传输Block(先从磁盘读取数据存储到一个本地内存缓存),以Packet为单位(一个Packet为64kb),写入数据的时候DataNode会进行数据校验,并不是通过Packet为单位校验,而是以chunk为单位校验(512byte),第一个DataNode收到第一个Packet就会传给第二台,第二台传给第三台;每一台每传一个Packet就会放入一个应答队列等待应答
8 当一个Block传输完成时,客户端再次请求NameNode,第二个Block应该上传的服务器(循环3-7步骤)。

HDFS小文件问题处理

首先,hdfs存在小文件问题主要有两个原因:NameNode 内存管理和 MapReduce 性能。
第一个,NameNode内存管理,因为每个hdfs的文件云信息保存在NameNode的内存中咩咩个对象大约占用150字节,如果小文件过多,会占用大量内存。10亿个文件需要约140g的内存,严重限制了集群规模的扩展。(nn重启,大量读磁盘,延迟;监听检查存储位置,消耗网络带宽)
第二个有用大量小文件会降低MapReduce处理性能。一是,大量随机磁盘io,比不上一次大文件的顺序读取;二是,一个文件一个map,初始化、启动、执行浪费大量资源。严重影响性能
例如:10000个10m文件:1w个map和800个128m文件:800个map的处理。

如何解决:?
第一个是,采用har归档方式,将小文件归档。
第二个是,避免产生小文件,从源头杜绝。
如果小文件不可避免,常用的是定期合并小文件为大文件。(oozie 定时脚本)

Shuffle 过程

从map输出到reduce输入的之间整个过程可以称为shuffle。
map阶段:读数据文件,切分成小的split,再执行map方法之后,从这里开始便处于shuffle过程。后续过程包括:
1、首先进行分区partition操作,这样能把map任务处理的结果发送给指定的reduce执行,从而达到负载均衡,避免数倾斜,默认使用HashPartitioner;
2、然后将k-v数据写到环形缓冲区,其大小默认为100m
3、然后,当缓冲区的数据达到阈值即80%,就会对数据进行分区按键排序(sort,在每个分区中对其中的键值对按键进行sort排序),如果自定了的combiner,还会将相同key的value相加“合并”(好处减少溢写数据量),之后再溢写到文件。
4、然后,如果一个map task处理的数据很大,会生成多个spill文件,这时会将这些文件进行“归并(merge)”成一个分区且排序的大文件。
———合并、归并 区别 两个 —> >
reduce阶段
1、首先,reduce进程将map生成的文件拉取过来(copy);
2、拉取(copy)过来的数据会先放入内存缓冲区中。如果内存放的下,会在内存里面进行merge(内存到内存merge);超过内存阈值(66%)后会将数据合并输出磁盘文件(内存到磁盘merge);当全部copy完成后,将多个文件进行一次归并排序并合并(磁盘到磁盘merge);(一般来说:reduce是一边copy一边sort,最终shuffle输出一个整体有序的数据块。)
3、然后根据键分组,生成Value迭代器,然后进入reduce函数计算阶段。
shuffle.png

——————————————-另外说法 -针对下面优化图描述——-理解记忆———————————-
shuffle过程发生在map方法之后,reduce方法之前。
1、待处理的数据及提交的信息、jar包、xml文件都上传到yarn上,在MapReduce程序启动之后,会先调用getPartition计算出map task数量
2、map执行之后向环形缓冲区中写键值对的数据,环形缓冲区默认大小100m,当写入80%之后,开始反向刷写,并将数据溢写到文件中。
3、reduce拉取相应分区的数据,先加载到内存中,如果内存不够再加载到磁盘进行归并,落盘。

Shuffle 优化

shuffle.png
1、map阶段:自定义分区、增大环形缓冲区、增大溢写比例、减少溢写文件merge次数。
2、reduce阶段:合理设置map,reduce数;避免使用reduce;增加去拉map数据的并行数;增大reduce端存储数据内存的大小。
3、IO传输:采用数据压缩,减少网络IO;map输入考虑切片bzip2 lzo(建索引),map输出考虑速度lzo snappy,reduce输出考虑下一级需求,从而选择合适的压缩编码。 gzip(压缩比大)
4、整体; : 加资源(内存 核心 多磁盘)

Hadoop解决数据倾斜方法

1)提前在map进行combine,减少传输的数据量
在Mapper加上combiner相当于提前进行reduce,即把一个Mapper中的相同key进行了聚合,减少shuffle过程中传输的数据量,以及Reducer端的计算量。
如果导致数据倾斜的key大量分布在不同的mapper的时候,这种方法就不是很有效了。
2)导致数据倾斜的key 大量分布在不同的mapper
(1)局部聚合加全局聚合。
第一次在map阶段对那些导致了数据倾斜的key 加上1到n的随机前缀,这样本来相同的key 也会被分到多个Reducer中进行局部聚合,数量就会大大降低。
第二次mapreduce,去掉key的随机前缀,进行全局聚合。
思想:二次mr,第一次将key随机散列到不同reducer进行处理达到负载均衡目的。第二次再根据去掉key的随机前缀,按原key进行reduce处理。
这个方法进行两次mapreduce,性能稍差。
(2)增加Reducer,提升并行度 JobConf.setNumReduceTasks(int)
(3)实现自定义分区
根据数据分布情况,自定义散列函数,将key均匀分配到不同Reducer

YARN工作机制

image.png
1、用户使用客户端向RM提交任务,指定队列、资源(无、默认)、参数。
2、RM收到请求后,选择一个NM启动容器,即ApplicationMaster(AM)。
3、AM 根据任务需求 向RM 申请container(数量、资源、位置等)。
4、如果队列资源足够,RM会将container分配给有足够剩余资源的NM,由AM通知NM启动container。
5、container启动后执行具体任务,处理分给自己的数据。NM 除了负责启动 container,还负责监控资源使用状况以及是否失败退出工作,若果container实际使用的内存超过申请时指定的,会将其杀死,保证其他container能正常运行。
6、各个container向AM汇报自己的进度,都完成后,AM向RM注销任务并退出,RM通知NM杀死对应的container,任务结束。


Kafka

Kafka文件存储机制

在kafka中,一个topic会被分割成多个partition分区,当用户查看创建的一个partition时,可以看到里面包含了三个文件,分别为log文件、index文件、timeIndex文件。其中,log文件存储的是batchRecords消息内容,而index和timeIndex分别是存储的额一些索引信息。这是哪个文件共同组成了一个segment,而文件名的(0)表示的是一个segment的起始offset(后续每个segment文件名为上一个segment文件最后一条消息的offset值),kafka会根据log.segment.bytes的配置来决定单个文件的大小,当写入数据达到这个大小时,就会创建一个新的segment。
image.png
在这三个文件中,timeIndex文件包含两个字段,分别为timestamp和offset;index文件包含两个字段,分别为offset和position;log文件包含多个字段,其中最重要的就是records字段;根据这三个文件,就可以基于offset找到对应的message消息。
image.pngimage.png


Hive

Hive的架构

  1. ![image.png](https://cdn.nlark.com/yuque/0/2022/png/29421089/1656580694677-ffad052f-5ee3-426c-a805-1918cb6248e6.png#clientId=u3e11bcf4-2e28-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=376&id=ub63589bf&name=image.png&originHeight=470&originWidth=588&originalType=binary&ratio=1&rotation=0&showTitle=false&size=55501&status=done&style=none&taskId=u596bb822-92bf-4dba-a614-bdd61603256&title=&width=470.4)<br />Hive架构组成可分为4大块:元数据、HDFS、MapReduce、Client<br />**元数据**:元数据包括库名、表名、用户、列字段、表类型、表目录等;默认存储在自带的derby数据库中,推荐使用mysql存储元数据。<br />然后,使用**HDFS**进行储存和使用**MapReduce**进行计算。<br />**Client**:包括JDBC和驱动器:<br />Jdbc主要是使用jdbc方式访问hive;<br />驱动器driver包括:解析器、编译器、优化器、执行器,主要是将sql解析成对应的MapReduce程序。这样才能使用mr计算。<br />**其运行机制是:**用户创建表,将表与存放在HDFS上的数据建立映射关系;比如通过交互接口,在用户发出一个查询sql指定后,hive的驱动器将sql解析为MR,提交到 Hadoop 中执行,最后,将执行返回的结果输出到用户交互接口。

Hive是建立在Hadoop之上的数据仓库基础构架、是为了减少MapReduce编写工作的批处理系统,Hive本身不存储和计算数据,它完全依赖于HDFS和MapReduce。Hive可以理解为一个客户端工具,将我们的sql操作转换为相应的MapReduce jobs,然后在Hadoop上面运行。

Hive和数据库对比有什么区别?

Hive和数据库相比,处了拥有类似的查询语言以外,其他都不相同。

  1. 从存储位置来说,hive存储在HDFS,而数据库将数据存储在自己的系统中。相对应的hive存储的数据量远大于数据库。
  2. hive中不建议对数据的改写。而数据库中的数据通常是经常进行修改的。
  3. hive可以支持很大规模的数据计算,而数据库可以支持的数据规模较小
  4. hive执行延迟较高。当然当数据规模大到超出数据库的处理能力的时候,他的并行计算显然是能体现优势的。
  5. hive数据格式可自定义、无索引等

HIve的内部表和外部表?

创建表的时候使用external修饰表即为外部表。
主要区别在于内部表的数据由hive自身管理,外部表的数据由HDFS管理。
当删除内部表时,会直接将元数据和储存数据全部删除,而删除外部表的时候,只会将元数据删除,HDFS上的文件不会被删除。
绝大数场景下都是使用外部表,一般只有在使用临时表时才会创建内部表。

Hive窗口函数

over()才是窗口函数,而sum、row_number、count只是与over()搭配的分析函数
分析函数 over(partition by 列名 order by 列名 rows between 开始位置 and 结束位置)
常用:unbounded preceding and current row 起点到当前行,用于累加
排名:

row_number(编号) 12345 rank(排名) 12335 dense_rank(非空排名)12334

分析函数:
lag(col,n,default_val) 往前第n行的数据
lead(col,n,default_val) 往后第n行的数据
ntile(n) 数据平分成n组,并从1编号,并返回此行所属的组的编号

  1. if(判断式,True-valfalse -val
  2. case when 判断 then val
  3. when 判断 then val
  4. .....
  5. else val
  6. end

Hive join

hive-join.png

手写topN

  1. --各学科的top2
  2. select * from(select id,user,subject,score,row_number()over(partition by subject order by score desc)as rank from tb)t where t.rank =1

Hive优化(调优)?

1、Mapjoin。适用于小表join大表。在Reduce阶段完成join。容易发生数据倾斜,把小表全部加载到内存在map端进行join,避免reducer处理。
2、行列过滤。select中只拿需要的列字段,尽量使用分区过滤;关联查询的时候,先where过滤,再关联。
3、数据存储文件格式选择列式存储 (orc格式+ lzo压缩)。
4、创建表采用分区技术。辅助缩小查询范围。
5、合理的设置map数、reduce数。
6、小文件过多,合并小文件。
7、使用压缩,减少Io读写和网络传输
8.使用tez引擎和spark引擎。

Hive数据倾斜的解决方案?

什么是数据倾斜?
数据倾斜在mr计算框架中经常发生。通俗理解,该现象是指在整个计算中,大量相同的key被分配到同一个任务上,造成“一个人累死,其他人闲死”,使得整体效率十分低下。
如何解决?
首先,Hive产生数据倾斜的主要分为map端倾斜和reduce端倾斜。map端倾斜主要是因为输入的文件大小不均匀导致,造成部分数据大量集中在某一个节点上,形成数据热点,导致这一节点运行时间远大于其他节点时间;reduce端倾斜主要是partition分区不均匀导致,常见场景是join和count distinct。
所以解决数据倾斜可以从以下方面:

  1. map端数据倾斜。开启负载均衡。

    1. 合并小文件,可以通过设置set hive.merge.mapfiles=true
    2. map端聚合,设置set hive.map.aggr=true
    3. 开启负载均衡:set hive.groupby.skewindata=true

    这样MapReduce 生成的查询计划会有两个MR job。
    第一个job中,map端的输出结果集合会随机分布到Reduce中,每个reduce都做部分聚合,
    并输出结果,这样的处理结果是相同的group by key有可能被分布到不同的reduce中,
    从而达到负载均衡的目的。
    第二个MR job再根据预处理的数据结果按照group by key
    分布到reduce中(这个过程可以保证相同的group by key被分布到同一个reduce中),
    最后完成最终的聚合操作。

  2. 小表join大表。使用mapjoin将小表加载到内存中,然后在对大表进行map操作,这样join就会发生在map操作,不会设计到reduce操作,优势在于没有shuffle。

    1. -- 自动开启MAPJOIN优化,默认值为true
    2. set hive.auto.convert.join=true;
    3. -- 通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中,默认值为2500000(25M),
    4. set hive.mapjoin.smalltable.filesize=2500000;
  3. 关联条件字段为0或null的较多。可以给空值加随机值前缀,将其分发到不同的reduce处理、或去重空值(这个查询两次,不如前面)

  4. 不同数据类型关联产生。对于两个表join,表a中需要join的字段key为int,表b中key字段既有string类型也有int类型。当按照key进行两个表的join操作时,默认的Hash操作会按int型的id来进行分配,这样所有的string类型都被分配成同一个id,结果就是所有的string类型的字段进入到一个reduce中,引发数据倾斜。

解决办法:
方案一:把数字类型转换成字符串类型;
方案二:建表时按照规范建设,统一词根,同一词根数据类型一致

  1. count(distinct id)大量相同特殊值。全局排序导致生成一个reduce任务。可去重然后分组统计

总结

  1. 如果任务长时间卡在99%,基本就是发生了数据倾斜,可调整参数实现负载均衡:set hive.groupby.skewindata=true
  2. 小表关联大表,先看能否使用子查询,再看能否使用Mapjoin
  3. join操作注意关联字段不能出现大量重复值或空值
  4. count(distinct id)去重统计慎用,可用sum groip by替代



Spark

Spark数据倾斜的解决方案有哪些?

首先。产生数据倾斜的主要原因是在shuffle过程中不同的key对应的数据量不同,从而导致不同的task所分配的数据量不均匀所产生的。
所以解决数据倾斜可以从以下方面:

  1. 提高shuffle操作的并行度。直接增加shuffle读task的数量,比如设置reduceByKey[1000] ,一般默认是200。

优点:有效缓解
缺点:无法彻底解决
2、使用随机数前缀进行Join操作。对大量相同的key进行附加随机数前缀,变成不同的key,然后将这些不同的key分散到不同的task去处理
优点:对join类型的数据倾斜大都可以处理
缺点:对内存要求比较高
3、将reduce Join 转为map join。适用于两张表join时,一张表数据量比较小。通过将小表全量广播,然后通过map算子来实现与join相同的效果(即map join)。该方法不会发生数据倾斜。
优点:不会发生shuffle
缺点:只适用于大表加小表
4、过滤少数导致数据倾斜的key。前提条件,少数几个数据量特别多的key对任务的执行影响不会过大。可以直接通过where语句将它过滤掉。
优点:实现简单
缺点:受限于特定的场景
5、使用hive预处理数据。如果数据倾斜导致原因发生在hive,直接对hive进行预处理操作,从源头来规避数据倾斜的问题。
优点:提高spark的作业性能
缺点:hive也会发生数据倾斜,治标不治本
6、使用两阶段预聚合操作。适用于reduceByKey[] groupBy[] 分组等场景。先通过局部预聚合,再通过全局预聚合来解决。
优点:显著提升Spark的作业性能
缺点:受限于固定的场景

Spark任务提交全流程

当在命令行执行spark -submit —master xxx.jar …等命令后会执行以下操作:

  1. 客户端向资源管理器Master发送注册和申请资源的请求。其中master主要负责任务资源的分配
  2. Master收到申请资源的请求后,向指定的worker节点发送请求,然后worker节点会开启对应的executor的进程。
  3. Executor进程会向driver发送一个注册请求,申请要计算的task。
  4. 在driver的内部会执行一些操作,最终都会通过taskScheduler提交task到executor进程里面去运行。

具体细节包括:
4.1 driver端会运行客户端的程序中的main方法
4.2 在main方法中,创建一个SparkContext的上下文对象,该对象是所有spark程序的一个执行入口,在构建SparkContext的内部,会构建两个对象,分别是DAGScheduler和TaskScheduler。
4.3 因为在用户代码中RDD算子会涉及大量的转换操作,然后通过一个动作(action)操作触发任务的真正执行。在这里,会按照RDD与RDD之间的依赖关系,首先会生成一张DAG的有向无环图,图的方向就是RDD算子的操作顺序,最终会将RDD、DAG有向无环图发送给DAGScheduler对象。
4.4 DAGScheduler在获取DAG有向无环图后,会按照依赖进行stage的划分。由于RDD算子中包括大量的宽依赖,所以在划分多个stage之后,每一个stage的内部有很多个可以并行运行的task的线程;然后将这些并行运行的task线程封装在一个taskSet集合中;最后将多个taskSet的集合发送给TaskScheduler对象。
4.5 TaskScheduler对象在获取这些taskSet的集合之后,会按照多个stage之间的依赖关系,前面的stage的task先运行,后面的再运行;然后TaskScheduler对象依次遍历每一个TaskSet集合,获取每一个task;最后将每一个task提交到Worker节点的Executor进程运行。

  1. 当所有的task任务在Executor进程里面依次运行完成后。Driver端会向master发送一个注销请求。
  2. Master收到这个请求后,然后通知对应的Worker节点关闭Executor进程;最后Worker节点上的计算资源就会得到释放。

—!!!Yarn client和cluster模式区别在于: driver端在client、nm的AM上生成
image.png

Spark任务调度方式

分为stage级的调度和task级的调度。
首先,在创建SparkContext上下文对象时会创建DAGScheduler和TaskScheduler。
其中DAGScheduler负责stage级的调度。主要是将Job根据宽依赖切分干个stage,并将每个stage打包成TaskSet交给TaskScheduler调度。
而TaskScheduler负责task级的调度。将TaskSet按照指定的调度策略分配给Executor进行执行。
TaskScheduler支持两种调度策略:FIFO(默认)、Fair(公平调度策略)。

Spark 血统(lineage)、RDD

RDD(Resilient Distributed Datasets,弹性分布式数据集) ,一个transform图:
image.png
图中每个长方形都是一个RDD,但是他们表示的数据结构不同。
pipeline的处理方式:
流式数据。流式数据在服务器故障后,如何做到”可恢复”?
血统:一个RDD的血统,就是如上图那样的一系列处理逻辑,spark会为每个RDD记录其血统,借用范伟的经典小品的桥段,spark知道每个RDD的子集是”怎么没的”(变形变没的)以及这个子集是 “怎么来的”(变形变来的),那么当数据子集丢失后,spark就会根据lineage,复原出这个丢失的数据子集,从而保证Datasets的弹性。

RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性以及划分任务时候起到重要作用。

  • lineage 是spark容错的机制的基础,它描述RDD之间的不同依赖关系,记录子RDD是如何从其它RDD中演变过来的,分为宽依赖和窄依赖。
  • 宽依赖:父RDD的分区数据会分发到子RDD的多个分区
  • 窄依赖:父RDD的分区数据会分发到子RDD的一个分区

    Spark 宽窄依赖,stage划分,task个数

    image.png

  • 宽依赖:父RDD的分区数据会分发到子RDD的多个分区

  • 窄依赖:父RDD的分区数据会分发到子RDD的一个分区

spark的DAGScheduler根据宽窄依赖将DAG划分为多个stage,spark以stage作为task的模板,生成一个或多个task,并将其调度到TaskExecutor执行。
task的数量本质上是由RDD的分区数决定的,一个分区对应一个task。

分区数的确定和改动有以下几种情况:
由带partitioner的数据源确定,比如数据来自hdfs,则根据hdfs的数据分片方式确定分区数;如果数据来自kafka,则有topic的分区数确定
不带partitioner的数据源,其分区数由参数”spark.default.parallelism”确定
通常map阶段的task数由数据源确定,如果有shffle,shuffle的reduce的并行度 由”spark.sql.shuffle.partitions”确定,默认是200
也可以通过代码对分区数进行干预,一是读取数据时可以设定最小分区数,二是可以通过repartition进行数据重分区

请列举Spark的transformation算子(不少于8个),并简述功能(重点)

1)map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成.
2)mapPartitions(func):类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
3)reduceByKey(func,[numTask]):在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
4)aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
5)combineByKey(createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C, C) =>C):

对相同K,把V合并成一个集合。
1.createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
2.mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
3.mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

请列举Spark的action算子(不少于6个),并简述功能(重点)

1)reduce:
2)collect:
3)first:
4)take:
5)aggregate:
6)countByKey:
7)foreach:
8)saveAsTextFile:

请列举会引起Shuffle过程的Spark算子,并简述功能。

reduceBykey:
groupByKey:
…ByKey:

Spark常用算子reduceByKey与groupByKey的区别,哪一种更具优势?(重点)

reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
groupByKey:按照key进行分组,直接进行shuffle。
开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

Repartition和Coalesce关系与区别

1)关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)区别:
repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce

分别简述Spark中的缓存机制(cache和persist)与checkpoint机制,并指出两者的区别与联系

都是做RDD持久化的
cache:内存,不会截断血缘关系,使用计算过程中的数据缓存。
checkpoint:磁盘,截断血缘关系,在ck之前必须没有任何任务提交才会生效,ck过程会额外提交一次任务。

简述Spark中共享变量(广播变量和累加器)的基本原理与用途。(重点)

累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。
共享变量出现的原因:
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。
Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。


Flink

Flink中,反压有哪些危害?

反压得不到正确的处理,可能会影响到checkpoint的时长以及state的大小,甚至会导致资源消耗以及系统奔溃。

  1. 影响checkpoint的时长。首先,barrier不会穿过普通数据,所以数据处理被阻塞,也会导致checkpoint流经整个数据管道的时间边长,这样会导致checkpoint总体执行时间变长。
  2. 影响state的大小。当barrier对其时,接收到较快的输入管道的barrier后,他后面的数据会被缓存起来,但是不会被处理直到较慢的输入管道的barrier到达之后,这些被缓存的数据才会被放到state里,这样会导致checkpoint变大。

这两个影响对于生产环境的作业来说是十分危险的,因为checkpoint是保证数据一致性的关键,checkpoint的执行时间变长,会导致checkpoint执行失败;同样state的大小,也会导致checkpoint得一个总体执行时间变长,甚至会导致OOM

Flink Sql应用提交流程?

总共包括两大阶段:SQL到operation的转化、operation到transformations的转化。
1、SQL到operation的转化,主要经过以下4个阶段:

  1. SQL解析。 词法、语法解析 sql->Sql node
  2. SQL校验。 校验表、字段、函数名;特殊类型(join …)是否正确
  3. 调用rel()。sqlNode->关系代数树RealNode(关系表达式) 和

RexNode(行表达式)

  1. 调用convert()。RealNode->operation

2、operation到transformations的转化,:

  1. Modify operation转化为calcite的逻辑计划树,再转化为flink的逻辑计划树
  2. 调用optimize方法将flink逻辑计划树优化成物理计划树;优化:规则优化RBO、代价优化CBO
  3. 调用translateToExecNodeGraph 将物理计划转化为ExceGraph
  4. 调用translateToPlan 将 ExceGraph转化为transformations

    Flink Sql支持那些join操作?

    包括以下三大类:流表与流表的join、流表与维表的join、动态表字段的列转行。

  5. 流表与流表的join。包括regular join、temporal join和interval join。

  6. regular join主要是两条流的join,可以是inner、out、full join等,适用于场景如计算点击率等,缺点是会产生回撤流.
  7. interval join主要是计算两条流在一段时间区域内的join操作,可以让一条流去join另一条流中前后一段时间内的数据。
  8. temporal join 主要用于快照join,包括事件时间、处理时间的join,类似于离线计算的快照join,可以用于计算汇率计算的场景。
  9. 流表与维表的join。如lookup join
  10. lookup join主要用于流与外部维表的join操作。因为一般的用户画像数据会存储在mysql hbase 或redis中,当用户日志流过来后,需要实时查询数据,所以就要用到lookup join。
  11. 动态表字段的列转行。包括Table function 和Array expansion。
  12. Array expansion 是将表中的Array字段压平转为多行,适用于一列转多行的操作。
  13. Table function与Array expansion 功能类似。本质是一个udf函数,需要用户自定义udf实现它

数仓理论

数仓分层

数据仓库分层

image.png
ODS层
原始数据层,存放原始数据,直接加载原始日志,数据。数据保存原貌不做处理,起到备份数据的作用。
数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右,LZO)
创建分区表,防止后续的全表扫描。
DWD层
对ODS层数据进行清洗(去除空值,脏数据,超过极限范围的数据),维度退化,脱敏等。
需构建维度模型,一般采用星型模型,呈现的状态一般为星座模型。
DWS层
以DWD为基础,按天进行轻度汇总。
统计各个主题对象的当天行为,服务于DWT层的主题宽表,以及一些业务明细数据,应对特殊需求(例如,购买行为,统计商品复购率)。
DWT层
以DWS为基础,按主题进行汇总。
以分析的主题对象为建模驱动,基于上层的应用和产品的指标需求,构建主题对象的全量宽表。
ADS层
为各种统计报表提供数据。

数据仓库分层的好处

  • 把复杂问题简单化

将复杂的任务分解成多层来完成,每一层只处理简单的任务,方便定位问题

  • 减少重复开发

规范数据分层,通过中间层数据,能够减少极大的重复计算,增加一次计算结果的复用性

  • 隔离原始数据

不论是数据的异常还是数据的敏感性,使真实数据和统计数据解耦开。

关系建模与维度建模

当今的数据处理大致可以分成两大类:联机事务处理OLTP(on-line transaction processing),联机分析处理OLAP(on-line analytical processing)。

OLTP是传统的关系型数据库的主要应用,主要是基本的,日常的事务处理,例如银行交易。
OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。
二者的主要区别对比:

对比属性 OLTP OLAP
读特性 每次查询只返回少量记录 对大量记录进行汇总
写特性 随机,低延时写入用户的输入 批量导入
使用场景 用户,JavaEE项目 内部分析师,为决策提供支持
数据表征 最新数据状态 随时间变化的历史状态
数据规模 GB TB 到 PB

关系建模、维度建模

通常我们采用维度模型建模,把相关各种表整理成两种:事实表维度表两种。

  • 在维度建模的基础上又分为三种模型:星型模型,雪花模型,星座模型。

星型模型
image.png
雪花模型
image.png
雪花模型,比较靠近3NF,但是无法完全遵守,因为遵循3NF的性能成本太高。
星型模型与雪花模型的区别
区别主要在于维度的层次。标准的星型模型维度只有一层,而雪花模型可能会涉及多层。
星座模型
image.png

星座模型与前两种情况的区别是事实表的数量,星座模型是基于多个事实表。
基本上是很多数据仓库的常态,因为很多数据仓库都是多个事实表的。所以星座不星座只反映是否有多个事实表,他们之间是否共享一些维度表
所以星座模型并不和前两个模型冲突。

模型的选择
星座不星座只跟数据和需求有关系,跟设计没关系,不用选择。
星型还是雪花,取决于性能优先,还是灵活更优先
实际企业开发中,根据情况灵活组合,甚至并存(一层维度和多层维度都保存)。
但是整体来看,更倾向于维度更少的星座模型。尤其是Hadoop体系,减少join就是减少Shuffle,性能差距很大。

维度表和事实表(重点)

维度表

维度表的概念
一般是对事实的描述信息。每一张维表对应业务中一个对象或者概念。
例如:用户,商品,日期,地区等。
维表的特征

  • 维表的范围很宽(具有多个属性,列比较多)
  • 跟事实表相比,行数相对较小:通常<10万条
  • 内容相对固定:例如:编码表

  • 事实表

    事实表的概念
    事实表中每行数据代表一个业务事件(下单,支付,退款,评价等)。
    “事实”这个术语表示的是业务事件的度量值(可统计次数,个数,金额等),例如,订单事件中的下单金额。
    每一个事实表的行包括:具有可加性的数值型的度量值,与维表相连接的外键。通常具有两个和两个以上的外键。

事实表的特征
数据量非常的大
内容相对的窄:列数比较少
经常发生变化,每天会新增加很大。
事实表的分类
事务型事实表
以每个事务或事件为单位, 例如:一个销售订单记录,一笔支付记录等,作为事实表里的一行数据。
一旦事务被提交,事实表数据被插入,数据就不再进行更改。其更新方式为增量。

周期型快照事实表
周期型快照事实表中不会保留所有数据,只保留固定时间间隔的数据。
不关心具体的业务操作,只关心结果。
例如,每天或者每月的销售额,或每月的账户余额等。
其更新方式为全量。

累积型快照事实表
累积快照事实表用于跟踪业务事实的变化。
例如,数据仓库中可能需要累积或者存储订单从下订单开始,到订单商品被打包,运输,签收的各个业务阶段的时间点数据来跟踪订单生命周期的进展情况。
当这个业务过程进行时,事实表的记录也要不断更新。其更新方式为新增及变化。

数据仓库建模(绝对重点)

ODS层

保持数据原貌不做任何修改,起到备份数据的作用。
数据采用压缩,减少磁盘存储空间(例如:原始数据100G,可以压缩到10G左右)
创建分区表,防止后续的全表扫描

DWD层

DWD层需构建维度模型,一般采用星型模型,呈现的状态一般为星座模型。
维度建模一般按照以下四个步骤:
选择业务过程 -> 声明粒度 -> 确认维度 -> 确认事实

选择业务过程
在业务系统中,挑选我们感兴趣的业务线,比如下单业务,支付业务,退款业务,物理业务。
一条业务线对应一张事实表

声明粒度
数据粒度:指数据仓库的数据中保存数据的细化程度或综合程度的级别。
声明粒度:意味着精确定义事实表的一行数据表示什么,应该尽可能选择最小粒度,以此来应对各种各样的需求。

典型的粒度声明:
订单详情表中,每行数据对应一个订单中的一个商品项
订单表中,每行数据对应一个订单
确认维度
维度的主要作用是描述业务是事实,主要表示的是“谁,何处,何时”等信息。
方法:退化维度主题,罗列出所有维度(主题),然后确认哪些业务与哪些维度(主题)有关。

确认事实
此处的“事实”一词,指的是业务中的度量值,例如订单金额,下单次数等。

小结
在DWD层,以业务过程为建模驱动,基于每个具体业务过程的特点,构建最细粒度的明细层事实表。事实表可做适当的宽表化处理。
构建维度退化的维度表。

  • 至此,数仓的维度建模已经完毕。
  • DWS,DWT,ADS都和维度建模已经没有关系了。DWS和DWT层是为了优化重复查询相同的数据而生成的
  • DWS和DWT都是建宽表,宽表都是按照主题去建。主题相当于观察问题的角度,对应着维度表。
  • 所谓的宽表:以维度表为中心,去关联相关的事实表,然后确定度量值。然后进行汇总,接着使用维度数据去替换外键,生成最终的宽表。也就是说,宽表里存的是真实维度值和汇总度量值

    DWS层

    统计各个主题对象当天行为,服务于DWT层的主题宽表,以及一些业务明细数据,应对特殊需求(例如,购买行为,统计商品复购率)。

  • 每日设备行为

  • 每日会员行为
  • 每日商品行为
  • 每日地区统计
  • 每日活动统计

    DWT层

    以分析的主题对象为建模驱动,基于上层的应用和产品的指标需求,构建主题对象的全量宽表

  • 设备主题

  • 会员主题
  • 商品主题
  • 地区主题
  • 活动主题

    ADS层

    对电商系统各大主题指标分别进行分析。

数据同步策略
数仓中的数据有一个时间轴的概念,体现在数据上就是以年-月-日进行数据同步和分区。保存历史状态。
我们可以以时间为维度,分析各种指标在时间上的延续性。
这里的数据同步指的是数据从业务服务器传到数仓

  • 全量表:存储完整的数据
  • 增量表:存储新增加的数据
  • 新增及变化表:存储新增加的和变化的数据
  • 特殊表:只需要存储一次

    全量同步策略

    每日全量:就是每天存储一份完整数据,作为一个分区
    适用:表数据量不大,且每天既会有新数据插入,也会有旧数据修改的场景

例如:
维度表:

  1. - 编码字典维度表:编码字典表
  2. - 商品维度表:品牌表,商品三级分类,商品二级分类,商品一级分类,SKU商品表,SPU商品表
  3. - 活动维度表:活动规则表,活动表,活动参与商品表
  4. - 优惠券维度表:优惠券表

事实表:

  1. - 加购表
  2. - 商品收藏表

这两张事实表数据量也不小,用全量同步策略是因为业务需求。数仓用来做周期性快照事实表。

增量同步策略
每日增量:就是每天存储一份增量数据,作为一个分区。
适用:表数据量大,且每天只会有新数据插入的场景。
例如:
事实表:
退单表,订单状态表,支付流水表,订单详情表,活动与订单关联表
商品评论表

新增及变化策略表
每日新增及变化:就是存储创建时间和操作时间都是今天的数据。
适用:表的数据量大,既会有新增,又会有变化。
例如:
维度表:更新使用拉链表
用户表
事实表:更新使用累积型快照事实表
订单表,优惠券领用表

特殊策略(不变化)
某些特殊的维度表,可不必遵循上述同步策略。
1)客观世界维度
没变化的客观世界维度可以只存一份固定值。比如 性别,地区,名族,政治成分,鞋子尺码。
2)日期维度
日期维度可以一次性导入一年或若干年的数据。
3)地区维度
省份表,地区表


Java SE

HashMap底层源码 数据结构

hashMap的底层结构在jdk1.7中由数据+链表实现,在jdk1.8中由数组+链表+红黑树实现。image.pngimage.png

Java自带哪几种线程池

1、newCachedThreadPool:可缓存线程池,灵活回收空闲线程,过多需求则新建;注意任务数量,避免系统奔溃。
2、newFixedThreadPool:指定数据线程池,提交任务创建线程,超过进池队列;不会释放工作线程。
3、newSingleThreadExecutor:单线程池,保证任务顺序执行;异常结束,新的替代。
4、newScheduledThreadPool:定长线程池,支持定时、周期性的任务执行,延迟执行。

HashMap和HashTable区别

1) 线程安全性不同
HashMap是线程不安全的,HashTable是线程安全的,其中的方法是Synchronize的,在多线程并发的情况下,可以直接使用HashTabl,但是使用HashMap时必须自己增加同步处理。
2) 是否提供contains方法
HashMap只有containsValue和containsKey方法;HashTable有contains、containsKey和containsValue三个方法,其中contains和containsValue方法功能相同。
3) key和value是否允许null值
Hashtable中,key和value都不允许出现null值。HashMap中,null可以作为键,这样的键只有一个;可以有一个或多个键所对应的值为null。
4) 数组初始化和扩容机制
HashTable在不指定容量的情况下的默认容量为11,而HashMap为16,Hashtable不要求底层数组的容量一定要为2的整数次幂,而HashMap则要求一定为2的整数次幂。
Hashtable扩容时,将容量变为原来的2倍加1,而HashMap扩容时,将容量变为原来的2倍。

TreeSet和HashSet区别

HashSet是采用hash表来实现的。其中的元素没有按顺序排列,add()、remove()以及contains()等方法都是复杂度为O(1)的方法。
TreeSet是采用树结构实现(红黑树算法)。元素是按顺序进行排列,但是add()、remove()以及contains()等方法都是复杂度为O(log (n))的方法。它还提供了一些方法来处理排序的set,如first(),last(),headSet(),tailSet()等等。

String buffer和String build区别

1、StringBuffer与StringBuilder中的方法和功能完全是等价的。
2、只是StringBuffer中的方法大都采用了 synchronized 关键字进行修饰,因此是线程安全的,而StringBuilder没有这个修饰,可以被认为是线程不安全的。
3、在单线程程序下,StringBuilder效率更快,因为它不需要加锁,不具备多线程安全而StringBuffer则每次都需要判断锁,效率相对更低

==和Equals区别

== : 如果比较的是基本数据类型,那么比较的是变量的值;
如果比较的是引用数据类型(包括包装类),那么比较的是地址值(两个对象是否指向同一块内存)。
equals:如果没重写equals方法比较的是两个对象的地址值(来源于Object类);
如果重写了equals方法后我们往往比较的是对象中的属性的内容。
equals方法是从Object类中继承的,默认的实现就是使用==
image.png
1、如果是基本类型比较,那么只能用==来比较,不能用equals
2、对于基本类型的包装类型,比如Boolean、Character、Byte、Shot、Integer、Long、Float、Double等的引用变量,==是比较地址的,而equals是比较内容的
3、注意:对于String(字符串)、StringBuffer(线程安全的可变字符序列)、StringBuilder,
后两者没有重写equals方法。

总结:如果是基本类型比较,那么只能用==来比较,不能用equals ,如果是基本类型的包装类型,那么用equals

JUC