image.png


2021-09-17 数仓用什么建模方法,常见的建模方法


【来自:数仓与大数据】
三范式就是拆分消除冗余,反三范式就是数据冗余,数仓里也叫维度退化,就是把维度属性放到是表里减少了 join

多维分析的基本操作:
钻取:上卷或下钻。通过调整维度的个数,来变换分析的粒度。
切片与切块:选取整体的部分维度去看度量,选两个就是切片,三个或以上是切块
旋转:例如行列转换

维度建模
联机分析处理的概念最早由关系数据库之父 E.F.Codd 于 1993 年提出。Codd 提出了多维数据库和多维分析的概念,把业务系统面向业务逻辑、面向事务增删改查而设计的存储结构,转换成面向分析、侧重查询的多维分析型存储结构。将所有对象都抽象为维度、度量、属性三类:

  • 维度,可以理解为不同分析视角。
  • 属性,用来定义和描述维度。
  • 度量,是在一个或多个维度限定下的取值。

存储格式分为两种:

  1. 关系 OLAP(ROLAP):基于关系数据库的 OLAP 实现,细节数据以及为聚合后的的粗粒度数据,通常会存储到关系型数据库中。
  2. 多维 OLAP(MOLAP):有时候会构件 Cube,优点是使用方便,缺点是需要占用大量的存储空间。

上边两段是我在数据仓库系列开篇里边摘录两段内容。
我发现一个有意思的事情,关系建模和维度建模理论应该是同一个人(关系数据库之父 E.F.Codd)提出的,维度建模方法比关系建模方法晚出来了 20 年,这 20 年刚好是信息化从开端到普及的阶段,OLTP 系统存储下来的数据开始有了分析应用的需求和场景,而 OLTP 数据库在进行海量数据的分析处理上又有很多不足,比如大量的资源消耗和性能问题。
而维度模型和多维分析数据库就是在这样的背景下,由关系模型的奠基人提出来的,它将客观世界抽象成维度-属性-度量这三个概念,从而构成了维度模型。

  • 当维度模型在关系型数据库里实现就是我们大家常说的雪花模型或星座模型,反三范式或者逆规范化后就变成了星型模型
  • 当维度模型在多维分析数据库里实现会称为联机分析处理数据库。在 OLAP 多维数据库时,对这些数据的存储的索引,采用了维度数据涉及的格式和技术。性能聚集或预计算汇总表通常由多维数据库引擎建立并管理。由于采用预计算、索引策略和其他优化方法,多维数据库可实现高性能查询。
  • 当维度模型在 NoSQL 数据存储类型里实现,由于索引的缺失和对 Join 实现性能低下,维度模型会退化到星型模型甚至不建模了,就是直接粗暴的拉大宽表。
  • 当维度模型在 NewSQL 数据存储类型里实现,慢慢的开始回归到了最早关系型数据库的星型模型、雪花模型或星座模型中来了,比如最近比较火的 Doris。

【来自:威少】
方法:
一般是星型模型和雪花模型

星型模型
星型架构是一种非正规化的结构,多维数据集的每一个维度都直接与事实表相连接,所以数据有一定的冗余
雪花型模型
通过最大限度地减少数据存储量以及联合较小的维表来改善查询性能。雪花型结构去除了数据冗余。
星型模型VS雪花型模型
星型模型和雪花模型的对比,可以从以下四个角度来对比。
1、查询性能角度来看
在OLTP-DW环节,由于雪花型要做多个表联接,性能会低于星型架构;但从DW-OLAP环节,由于雪花型架构更有利于度量值的聚合,因此性能要高于星型架构。
2、模型复杂度角度
星型架构更简单方便处理
3、层次结构角度
雪花型架构更加贴近OLTP系统的结构,比较符合业务逻辑,层次比较清晰。
4、存储角度
雪花型架构具有关系数据模型的所有优点,不会产生冗余数据,而相比之下星型架构会产生数据冗余。

缓慢变化维
缓慢变化维处理方式:
1、新数据覆盖旧数据,不保留历史信息
2、快照表,每天对维度表做一个快照,保留所有信息,冗余数据较多
3、新增字段,保留历史值,保存的历史值有限
4、拉链表


【来自:技术-L.sh】

所谓的值就是一个个的数据,单一的数据称之为值的话,多个值的集合就是维度,值,也就是数据,本来就是在事实表里的,因为用不到多个值了,那就不用多余的建维度,需要就直接取就好了

多个值组成维度,他本来就在事实表里,是因为要做复杂的关联才组合成维度,没有复杂的关系就直接用值就省的多一步了


【来自:大数据左右手】

上钻(上卷:自下而上,从当前数据回归到上层数据。

下钻:自上而下, 从当前数据继续向下获取下层数据。

钻取是在数据分析中不可缺少的功能之一,通过改变展现数据维度的层次、变换分析的粒度从而关注数据中更详尽的信息。它包括向上钻取( roll up )和向下钻取( drill down )。

上钻是沿着维度的层次向上聚集汇总数据,下钻是在分析时加深维度,对数据进行层层深入的查看。通过逐层下钻,数据更加一目了然,更能充分挖掘数据背后的价值,及时做出更加正确的决策。

维度退化
维度退化的维度表可以被剔除,从而简化维度数据仓库的模式。因为简单的模式比复杂的更容易理解,也有更好的查询性能。
当一个维度没有数据仓库需要的任何数据时就可以退化此维度。需要把维度退化的相关数据迁移到事实表中,然后删除退化的维度。
维度属性也可以存储到事实表中,这种存储到事实表中的维度列被称为“维度退化”。与其他存储在维表中的维度一样 , 维度退化也可以用来进行事实表的过滤查询、实现聚合操作等。

比如说订单id,这种量级很大的维度,没必要用一张维度表来进行存储,而我们进行数据查询或者数据过滤的时候又非常需要,所以这种就冗余在事实表里面,这种就叫退化维度,citycode这种我们也会冗余在事实表里面,但是它有对应的维度表,所以它不是退化维度。


2021-09-18 hive sql,怎么计算这个sql会产生多少个map数?


【来自:技术-逸辰】
hive中map的个数是否开启了小文件合并,切片大小默认是128m一个切片(具体看版本),开启一个maptask


【来自:技术-otw提出的引导性反问】
引出问题:Hadoop 默认块大小设置的依据是什么?为什么默认值设置为128M?这个128是怎么计算出来的?总不能是拍脑袋写的吧?

【来自:王了个博】
Math.max(minSize, Math.min(maxSize, blockSize))
minSize=1,blockSize=128M,maxSize=Long.MAX_VALUE
所以,默认情况,splitSize=blockSize=128M,其中:
minsize 通过mapreduce.input.fileinputformat.split.minsize参数控制
maxsize 通过mapreduce.input.fileinputformat.split.maxsize参数控制

【来自:技术-雨天】
首先切片大小是可以通过修改配置参数来改变的,但是默认情况下是和切块blocksize大小一致,这样做的目的就是为了在读取数据的时候正好能一次性读取一个块的数据,避免了在集群环境下发生跨机器读取的情况,如果跨机器读取会造成二外的网络IO,不利于MR程序执行效率的提升。

  1. HDFS中平均寻址时间大概为10ms;
    2. 经过测试发现,寻址时间为传输时间的1%时,为最佳状态;
    所以最佳传输时间为10ms/0.01=1000ms=1s
    3. 目前磁盘的传输速率普遍为100MB/s , 网卡普遍为千兆网卡传输速率普遍也是100MB/s;
    计算出最佳block大小:100MB/s x 1s = 100MB
    所以我们设定block大小为128MB。
    4. 实际在工业生产中,需要经过集群之间的具体情况进行设置.
    比如: 跨物理机/机架之间文件传输速率为200MB/s时,一般设定block大小为256MB , 文件传输速率为400MB/s时,一般设定block大小为512MB . 不过最大一般不会超过512MB , 因为目前固态硬盘的读写速率应该不会超过512MB(如果做RAID另行考虑.)。

【 问题】
如果是hive .on spark还会有map数的概念吗
如果是hive on spark 设置mapreduce的参数会生效吗

【来自:技术-讷言敏行】
我觉得跟mr没有关系了 spark本身就是就是基于内存的计算框架 用的rdd内套逻辑

【问题】
请问这些hive,spark如果是基于hdfs的,他读写是不是都是hdfs那套流程的?

【来自:技术-IvanLeung】
封装hdfs的inputformat
hdfs提供了一些inputformat的实现如果都不符合就要自己去实现
image.png

image.png

【来自:海哥】
spark hive flink读写都是hdfs

【问题】
hive on spark怎么考虑map数?

(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。


2021-09-22 面试官问:你有什么想问的吗?


【来自:公众号:志明】
这个通常在最后问,所以可以结合前面面试官问你的问题,反问一下,比如某某组件在咱们项目中具体怎么用的,我将来是负责这块吗?和别人怎么合作之类的,这个得结合面试情况来问,随机应变而非死记硬背,没啥标准答案,别问太出格的或非技术的都还不错。

最不愿听见问我的就是问薪资福利

问集群多大,多少台机器,你觉得这种合适吗?
问我还可以,但是这个问题我问面试者,十个有八个都回答不了,他们的答案通常是,他不负责搭建环境也没权限查看,关键是简历里却写了,负责或参与集群环境搭建


2021-09-23 Hive的内表和外表的区别,Hive为什么要做分区,Hive的元数据存在哪?


【来自:技术-贾彤】
Hive的内表和外表的区别:外部表被external修饰的表,内部表没有被external修饰。生产环境下大部分使用外部表,自己创建的临时表一般使用内部表。删除数据时,内部表的元数据和数据本身全部被删除,外部表只删除元数据,数据本身不被删除。 hive做分区也是为了提高查询效率,hive的元数据存放在默认的derby数据库中,不过可以将默认数据库修改成mysql数据库

【来自:技术-只争朝夕】
分区就是分目录,把大数据集切分成小的数据集,可以提高查询效率。元数据存在MySQL数据库,默认数据库是Derby。

【来自:技术-逸辰】
hive内部表用于自行测试 删除会删掉元数据和原始数据
外部表一般用于生产环境 删除只会删除元数据 保留原始数据
分区:为了提高查询效率
hive元数据:默认保存在derby,如果要多客户端访问要保存到mysql中

【补充】
1、外部表不会加载数据到hive,减少数据传输、数据还能共享。
2、hive不会修改数据,不担心数据的损坏。
3、删除表时,只删除表结构(元数据)、不删除数据。
Hive创建内部表时,会将数据移动到数据仓库指向的路径。创建外部表时,仅记录数据所在的路径,不对数据的位置做任何改变,在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据


2021-09-24 ZooKeeper是如何保证数据一致性


【来自:技术-陈紫隆】
image.png

【来自:技术-雪瞳】
paxos算法也是分布式决策的算法之一 而且具备通用性

【附带】
paxos算法
对Paxos保证一致性换一种理解:
Paxos 算法是分布式一致性算法用来解决一个分布式系统如何就某个值(决议)达成一致的问题。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点都执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个”一致性算法”以保证每个节点看到的指令一致。
分布式系统中一般是通过多副本来保证可靠性,而多个副本之间会存在数据不一致的情况。所以必须有一个一致性算法来保证数据的一致,描述如下:
假如在分布式系统中初始是各个节点的数据是一致的,每个节点都顺序执行系列操作,然后每个节点最终的数据还是一致的。
Paxos算法就是解决这种分布式场景中的一致性问题。对于一般的开发人员来说,只需要知道paxos是一个分布式选举算法即可。多个节点之间存在两种通讯模型:共享内存(Shared memory)、消息传递(Messages passing),Paxos是基于消息传递的通讯模型的。

大数据之分布式协调神器:Zookeeper选举


2021-09-26 清洗数据?(方式,注意事项或其他)

【来自:志明】
1、首先要清楚清洗的目的;
2、其次要清楚哪些表、字段需要清洗及清洗的规则(凭经验、问业务),还得做好清洗与否的影响分析;
3、然后通过一些的手段或工具(SQL、UDF、存过、MR、Spark、Python、等等)进行数据清洗操作;
4、再然后,检查清洗后的数据是否达到既定的要求或是否满足业务;
5、最后,可以把没问题的清洗流程固化到ETL流程、调度中(取决于实际场景),后续不断完善相关清洗规则等。其实就是不断的PDCA过程。

最好是能赋能上游,从源头就解决数据问题,清洗治标不治本。


【来自:绝域时空】
1.缺失值处理(通过describe与len直接发现、通过0数据发现)
2.异常值处理(通过散点图发现)
一般遇到缺失值处理方法(删除、插补、不处理)
插值补的方法:均值插补、中位数插补、众数插补、固定值插补、最近数据插补、回归插补、拉格朗日插值、牛顿插值法、分段插值etc.
遇到异常值,一般处理方法视为缺失值、删除、修补(平均数、中位数等)、不处理。


https://zhuanlan.zhihu.com/p/20571505
预处理阶段
预处理阶段主要做两件事情:
一是将数据导入处理工具。通常来说,建议使用数据库,单机跑数搭建MySQL环境即可。如果数据量大(千万级以上),可以使用文本文件存储+Python操作的方式。
二是看数据。这里包含两个部分:一是看元数据,包括字段解释、数据来源、代码表等等一切描述数据的信息;二是抽取一部分数据,使用人工查看方式,对数据本身有一个直观的了解,并且初步发现一些问题,为之后的处理做准备。

第一步:缺失值清洗
缺失值是最常见的数据问题,处理缺失值也有很多方法,我建议按照以下四个步骤进行:
1、确定缺失值范围:对每个字段都计算其缺失值比例,然后按照缺失比例和字段重要性,分别制定策略,可用下图表示:
image.png
2、去除不需要的字段:这一步很简单,直接删掉即可……但强烈建议清洗每做一步都备份一下,或者在小规模数据上试验成功再处理全量数据,不然删错了会追悔莫及(多说一句,写SQL的时候delete一定要配where!)。
3、填充缺失内容:某些缺失值可以进行填充,方法有以下三种:

  • 以业务知识或经验推测填充缺失值
  • 以同一指标的计算结果(均值、中位数、众数等)填充缺失值
  • 以不同指标的计算结果填充缺失值


前两种方法比较好理解。关于第三种方法,举个最简单的例子:年龄字段缺失,但是有屏蔽后六位的身份证号,so……
4、重新取数:如果某些指标非常重要又缺失率高,那就需要和取数人员或业务人员了解,是否有其他渠道可以取到相关数据。

以上,简单的梳理了缺失值清洗的步骤,但其中有一些内容远比我说的复杂,比如填充缺失值。很多讲统计方法或统计工具的书籍会提到相关方法,有兴趣的各位可以自行深入了解。
第二步:格式内容清洗
如果数据是由系统日志而来,那么通常在格式和内容方面,会与元数据的描述一致。而如果数据是由人工收集或用户填写而来,则有很大可能性在格式和内容上存在一些问题,简单来说,格式内容问题有以下几类:
1、时间、日期、数值、全半角等显示格式不一致
这种问题通常与输入端有关,在整合多来源数据时也有可能遇到,将其处理成一致的某种格式即可。
2、内容中有不该存在的字符
某些内容可能只包括一部分字符,比如身份证号是数字+字母,中国人姓名是汉字(赵C这种情况还是少数)。最典型的就是头、尾、中间的空格,也可能出现姓名中存在数字符号、身份证号中出现汉字等问题。这种情况下,需要以半自动校验半人工方式来找出可能存在的问题,并去除不需要的字符。
3、内容与该字段应有内容不符
姓名写了性别,身份证号写了手机号等等,均属这种问题。 但该问题特殊性在于:并不能简单的以删除来处理,因为成因有可能是人工填写错误,也有可能是前端没有校验,还有可能是导入数据时部分或全部存在列没有对齐的问题,因此要详细识别问题类型。
格式内容问题是比较细节的问题,但很多分析失误都是栽在这个坑上,比如跨表关联或VLOOKUP失败(多个空格导致工具认为“陈丹奕”和“陈 丹奕”不是一个人)、统计值不全(数字里掺个字母当然求和时结果有问题)、模型输出失败或效果不好(数据对错列了,把日期和年龄混了,so……)。因此,请各位务必注意这部分清洗工作,尤其是在处理的数据是人工收集而来,或者你确定产品前端校验设计不太好的时候……
第三步:逻辑错误清洗
这部分的工作是去掉一些使用简单逻辑推理就可以直接发现问题的数据,防止分析结果走偏。主要包含以下几个步骤:
1、去重
有的分析师喜欢把去重放在第一步,但我强烈建议把去重放在格式内容清洗之后,原因已经说过了(多个空格导致工具认为“陈丹奕”和“陈 丹奕”不是一个人,去重失败)。而且,并不是所有的重复都能这么简单的去掉……
我曾经做过电话销售相关的数据分析,发现销售们为了抢单简直无所不用其极……举例,一家公司叫做“ABC管家有限公司“,在销售A手里,然后销售B为了抢这个客户,在系统里录入一个”ABC官家有限公司“。你看,不仔细看你都看不出两者的区别,而且就算看出来了,你能保证没有”ABC官家有限公司“这种东西的存在么……这种时候,要么去抱RD大腿要求人家给你写模糊匹配算法,要么肉眼看吧。
上边这个还不是最狠的,请看下图:
image.png
你用的系统里很有可能两条路都叫八里庄路,敢直接去重不?(附送去重小tips:两个八里庄路的门牌号范围不一样)
当然,如果数据不是人工录入的,那么简单去重即可。
2、去除不合理值
一句话就能说清楚:有人填表时候瞎填,年龄200岁,年收入100000万(估计是没看见”万“字),这种的就要么删掉,要么按缺失值处理。这种值如何发现?提示:可用但不限于箱形图(Box-plot).
3、修正矛盾内容
有些字段是可以互相验证的,举例:身份证号是1101031980XXXXXXXX,然后年龄填18岁,我们虽然理解人家永远18岁的想法,但得知真实年龄可以给用户提供更好的服务啊(又瞎扯……)。在这种时候,需要根据字段的数据来源,来判定哪个字段提供的信息更为可靠,去除或重构不可靠的字段。
逻辑错误除了以上列举的情况,还有很多未列举的情况,在实际操作中要酌情处理。另外,这一步骤在之后的数据分析建模过程中有可能重复,因为即使问题很简单,也并非所有问题都能够一次找出,我们能做的是使用工具和方法,尽量减少问题出现的可能性,使分析过程更为高效。

第四步:非需求数据清洗
这一步说起来非常简单:把不要的字段删了。
但实际操作起来,有很多问题,例如:

  • 把看上去不需要但实际上对业务很重要的字段删了;
  • 某个字段觉得有用,但又没想好怎么用,不知道是否该删;
  • 一时看走眼,删错字段了。


前两种情况我给的建议是:如果数据量没有大到不删字段就没办法处理的程度,那么能不删的字段尽量不删。第三种情况,请勤备份数据……
第五步:关联性验证
如果你的数据有多个来源,那么有必要进行关联性验证。例如,你有汽车的线下购买信息,也有电话客服问卷信息,两者通过姓名和手机号关联,那么要看一下,同一个人线下登记的车辆信息和线上问卷问出来的车辆信息是不是同一辆,如果不是(别笑,业务流程设计不好是有可能出现这种问题的!),那么需要调整或去除数据。
严格意义上来说,这已经脱离数据清洗的范畴了,而且关联数据变动在数据库模型中就应该涉及。但我还是希望提醒大家,多个来源的数据整合是非常复杂的工作,一定要注意数据之间的关联性,尽量在分析过程中不要出现数据之间互相矛盾,而你却毫无察觉的情况。


2021-09-27 Spark coalesce和repartition的区别和使用场景


【来自:技术-贾】
spark中coalesce和repartition都可以用于改变RDD的分区数量,一般coalesce用于减少分区数量,而repartition用于增加分区数量。repartition底层修改分区数量也是调用coalesce方法。从源代码可以知道repartition默认进行shuffle,而coalesce默认不进行shuffle,不过是否shuffle都可以自己进行修改

【来自:技术-段】
coalesce 我们做数仓一般是在把数据写入目标表的时候会用到

【其他】
coalesce和repartition对比与区别
coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
repartition实际上是调用的coalesce,进行shuffle。源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
coalesce一般为缩减分区,如果扩大分区,也不会增加分区总数,意义不大。
repartition扩大分区执行shuffle,可以达到扩大分区的效果。

coalesce 一般用于最后的数据入库
repartition一般用于数据reduce过程

Spark value算子


2021-09-28 拉链表使用场景和制作过程


【来自:技术-Miracle】
image.png

image.png

【来自:公众号:志明】
拉链表
【场景】
记录下数据所有的状态变化的记录
适合状态会发生变化的数据,并且每次数据变化
量比较小,而历史状态又比较重要的数据

【怎么制作,来自:技术-方鸿渐】
image.png

  1. 1)建立拉链表
  2. hive (gmall)>
  3. drop table if exists dwd_dim_user_info_his;
  4. create external table dwd_dim_user_info_his(
  5. `id` string COMMENT '用户id',
  6. `name` string COMMENT '姓名',
  7. `birthday` string COMMENT '生日',
  8. `gender` string COMMENT '性别',
  9. `email` string COMMENT '邮箱',
  10. `user_level` string COMMENT '用户等级',
  11. `create_time` string COMMENT '创建时间',
  12. `operate_time` string COMMENT '操作时间',
  13. `start_date` string COMMENT '有效开始日期',
  14. `end_date` string COMMENT '有效结束日期'
  15. ) COMMENT '用户拉链表'
  16. stored as parquet
  17. location '/warehouse/gmall/dwd/dwd_dim_user_info_his/'
  18. tblproperties ("parquet.compression"="lzo");
  19. 2)初始化拉链表
  20. hive (gmall)>
  21. SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
  22. insert overwrite table dwd_dim_user_info_his
  23. select
  24. id,
  25. name,
  26. birthday,
  27. gender,
  28. email,
  29. user_level,
  30. create_time,
  31. operate_time,
  32. '2020-06-14',
  33. '9999-99-99'
  34. from ods_user_info oi
  35. where oi.dt='2020-06-14';
  36. 步骤1:制作当日变动数据(包括新增,修改)每日执行
  37. 1)如何获得每日变动表
  38. a.最好表内有创建时间和变动时间(Lucky!)
  39. b.如果没有,可以利用第三方工具监控比如canal,监控MySQL的实时变化进行记录(麻烦)。
  40. c.逐行对比前后两天的数据,检查md5(concat(全部有可能变化的字段))是否相同(low)
  41. d.要求业务数据库提供变动流水(人品,颜值)
  42. 2)因为ods_user_info本身导入过来就是新增变动明细的表,所以不用处理
  43. a)数据库中新增2020-06-15一天的数据
  44. b)通过Sqoop2020-06-15日所有数据导入
  45. mysql_to_hdfs.sh all 2020-06-15
  46. cods层数据导入
  47. hdfs_to_ods_db.sh all 2020-06-15
  48. 步骤2:先合并变动信息,再追加新增信息,插入到临时表中
  49. 1)建立临时表
  50. hive (gmall)>
  51. drop table if exists dwd_dim_user_info_his_tmp;
  52. create external table dwd_dim_user_info_his_tmp(
  53. `id` string COMMENT '用户id',
  54. `name` string COMMENT '姓名',
  55. `birthday` string COMMENT '生日',
  56. `gender` string COMMENT '性别',
  57. `email` string COMMENT '邮箱',
  58. `user_level` string COMMENT '用户等级',
  59. `create_time` string COMMENT '创建时间',
  60. `operate_time` string COMMENT '操作时间',
  61. `start_date` string COMMENT '有效开始日期',
  62. `end_date` string COMMENT '有效结束日期'
  63. ) COMMENT '订单拉链临时表'
  64. stored as parquet
  65. location '/warehouse/gmall/dwd/dwd_dim_user_info_his_tmp/'
  66. tblproperties ("parquet.compression"="lzo");
  67. 2)导入脚本

image.png

  1. hive (gmall)>
  2. SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
  3. insert overwrite table dwd_dim_user_info_his_tmp
  4. select * from
  5. (
  6. select
  7. id,
  8. name,
  9. birthday,
  10. gender,
  11. email,
  12. user_level,
  13. create_time,
  14. operate_time,
  15. '2020-06-15' start_date,
  16. '9999-99-99' end_date
  17. from ods_user_info where dt='2020-06-15'
  18. union all
  19. select
  20. uh.id,
  21. uh.name,
  22. uh.birthday,
  23. uh.gender,
  24. uh.email,
  25. uh.user_level,
  26. uh.create_time,
  27. uh.operate_time,
  28. uh.start_date,
  29. if(ui.id is not null and uh.end_date='9999-99-99', date_add(ui.dt,-1), uh.end_date) end_date
  30. from dwd_dim_user_info_his uh left join
  31. (
  32. select
  33. *
  34. from ods_user_info
  35. where dt='2020-06-15'
  36. ) ui on uh.id=ui.id
  37. )his
  38. order by his.id, start_date;
  39. 步骤3:把临时表覆盖给拉链表
  40. 1)导入数据
  41. hive (gmall)>
  42. insert overwrite table dwd_dim_user_info_his
  43. select * from dwd_dim_user_info_his_tmp;
  44. 2)查询导入数据
  45. hive (gmall)>
  46. select id, start_date, end_date from dwd_dim_user_info_his limit 2;

2021-09-29 大数据集群资源规划,规划存储和内存


例:对于日活300T需要多少的内存保证计算?
【来自:志明】
目前没有仔细估算过存储和内存资源,但是有过粗略的存储资源估算,主要也就是为了保证3年或5年的使用,主要就是依据现有的数据量,然后按每年增长百分比,累加起来的一个粗略估算值,再报给客户去采购。

之前有做过一个较为详细的,主要是表比较确定,也有一部分样本数据,根据那个,按照每行数据大概多少字节进行估算,其实并没啥卵用,毕竟用起来肯定不止这些表。

我做得主要也是离线数据为主,所以有很多问题也不用太考虑,装不下了就删历史数据,或者导出来放别的储存,或者就是加硬盘。

来自:技术-陈紫隆】
image.png

【总结】
1. 1亿条数据 每条数据设定1K =大约100G

  1. 80%的数据( 0.8亿)会在其余16个小时(8点-24点)涌入,20%时间( 3小时内)涌入
    QPS= 80000000/ (10800) 约=7000/s
    QPS:每秒查询率,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准,这个可以自己测试

  2. 内存估算,内存的估算其实这个是没有绝对标准
    实时任务占用的资源都是固定的,可以根据业务个数估算。离线任务可以根据ETL任务数和任务资源配置情况估算
    实时同时启用的时候不能超过资源90%,实时任务资源占用需要小于50%
    如果数据增量为100G ,实时任务7000/s QPS
    一分钟窗口 60*7000=420000 420000/1024/1024=0.4G
    五分钟窗口 2G
    如果离线任务,可以把1/4的数据放到内存,那就就需要 25G
    如果二者同时运行,按照不超过90%来计算:30G
    上面只是单纯的从实时和离线单任务来计算,可以选择32G内存

    4.磁盘:考虑hdfs,kafka,别的存储数量,这个可以手动估算


2021-10-09 kafka的底层原理,kafka如何保证全局消费数据有序


首先要看业务有没有顺序消费的需求
其次要考虑顺序要求与效率要求之间的平衡

1个topic中,有3个partition,那么如何保证数据的消费?
比如可以以订单ID作为key,然后取Hash,那么订单ID相同的一定会分到同一分区

单分区有序,那么我们想方法把同一个特征数据写到一个分区

在 Flink 中则可以即保证消费 kafka 中的数据全局有序,又可以构成多并发,这就是 flink 中的时间特性带来的效果。Flink 在创建 kafka 的数据源时可以将其中的所有数据都存有时间并设置对应的 watermark,这样利用 event time 对 kafka 中的数据已经形成了时间概念上的全局有序性,当 flink 在消费其中的数据时则根据时间处理即可保证 kafka 中数据的全局有序性。

【来自:技术-维C】
1.全局有序且数据量巨大
2.全局有序且数据量不大
3.局部有序且数据量巨大
4.局部有序且数据量不大
局部有序是指:比如一个用户的所有数据必须有序

对于第一种,建议离线排序,因为单机扛不住
第二种和第四种,都可以使用一个分区保证顺序
第三种的情况可以对用户ID做hash取模,使用多个topic


2021-10-11 Flink的一致性


【来自:技术-阿威】
flink的端到端一致性, source 由kafka 开启 exactly-only, 然后flink开启barrier对齐 和 checkpoint, sink有两阶段提交,由第一个数据来时,开启预提交,直到下一个 barrier到达第一个段数据事务关闭, 第二段数据提交开始。

//1.设置Checkpointing间隔时间,严格一次保证精准一次消费
env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);

【来自:技术-冰蓝】
首先source得支持重设读取位置 比如像kafka指定consumer offset
sink我觉得要么支持事务 要么下游幂等性

【总结】

  • souce:使用执行ExactlyOnce的数据源,比如kafka等
  • 内部使用FlinkKafakConsumer,并开启CheckPoint,偏移量会保存到StateBackend中,并且默认会将偏移量写入到topic中去,即_consumer_offsets Flink设置CheckepointingModel.EXACTLY_ONCE
  • sink:存储系统支持覆盖也即幂等性:如Redis,Hbase,ES等存储系统不支持覆:需要支持事务(预写式日志或者两阶段提交),两阶段提交可参考Flink集成的kafka sink的实现

Flink的一致性


2021-10-12 怎么判断一个需求能不能实现,你们的判断标准是什么?需求变更要做什么?


【来自:志明】
以我多年的经历来看:
1、甲方和公司领导认为,就没有技术实现不了的需求,没有判断标准可言;
2、需求变更是常态,变更流程是摆设。

【】
另一方面,一个需求实现,还与产品或者领导划定的交付时间有关系,还与现有的公司的技术,架构,和人才涉及技术方面有关系

【来自:技术-博文】
需求来了要讨论,怎么写,用什么技术,实现逻辑是什么,技术难点在哪。 拿到需求一般要开会,多人一起分析拆解需求,转变成一条条可实现的技术,如果出现了无法评估可行性的需求,就要去做技术调研,也就是查资料,问朋友,或者花钱买别人的代码/接口

实在搞不定的,就找甲方,问能不能换一种方案,虽然这种方案结果会有一些不一致,但是效果都是大体相同,并且说明现阶段需求的难度和开发周期问题

【来自:技术-otw】
我经常碰到的是,评估过后做不了因为数据质量问题或者数据覆盖面不够。但还是会被要求出数然后各种改甚至人工俢数


2021-10-13 银行的数据需要存储五年之久,对于这块你有什么存储的策略?


【来自:技术-博文】
可以设置不同的存储策略,一些比较老的文件一般用得少,比如三年前,四年前,五年前的,可以放到一些老旧的硬盘存储,同时采用压缩比率高的压缩算法压缩起来,比较常用的数据,比如最近2-3年,可以采用snappy压缩算法减少磁盘压力,同时这些数据存储在一些性能比较好的机械盘或者固态盘里面,然后就是最近一年的数据,用的非常频繁,可以存一份副本在内存中,其余的副本存在硬盘里面。

hadoop支持这种 冷热数据分离的策略,我记得是直接敲命令行命令就会采用对应的策略对数据转移

【】
异构存储就是将不同需求或者冷热的数据存储到不同的介质中去,实现既能兼顾性能又能兼顾成本。

存储类型

HDFS异构存储支持如下4种类型,分别是:

RAM_DISK(内存镜像文件系统)
SSD(固态硬盘)
DISK(普通磁盘,HDFS默认)
ARCHIVE(计算能力弱而存储密度高的存储介质,一般用于归档)
以上四种自上到下,速度由快到慢,单位存储成本由高到低。

【来自:技术-otw】
image.png


2021-10-14 clickhouse的优化方面的见解?


【来自:技术-没有蜡笔的小新】
不适合高频率多表 单条sql 会好一点

【来自:技术-博文】
image.png
image.pngimage.png
image.png


2021-10-15 flink的反压原理


【来自:技术-哦哦哦哦】
反压就是当消费数据的速度小于source生产数据的速度,造成了数据挤压,从source一端一路传递到后面。实际上,flink针对反压有自己的优化,它的生产数据的速度会根据消费速度不断调整。

术语“背压”最初描述了气体或液体在密闭空间(例如空气管道或管道)中向前流动的阻力。 气体(或液体)沿着管道(或管道)的侧面受到摩擦。 很容易创建一个实验来体验不同数量的气体背压。 例如,使用您的肺和口腔,尝试通过以下管吹出尽可能多的空气——尽可能快——以进行比较:

一根直径只有 1 或 2 毫米的小鸡尾酒吸管
一根普通的饮料吸管,例如麦当劳苏打水喷泉的吸管
一卷纸巾或卫生纸的纸板管

【来自:技术-小七】

Flink的反压是逐级向上的反压配合信任度来用的,和spark的反压还是有区别的。

flink的反压主要是通过流式架构和网络缓存做逐级反压。具体来说就是,对于一个任务会有自己单独一片网络缓冲区,可以缓存已经收到、但还未处理的数据;当缓冲区满的时候,就通知上游任务不要再发数据了。而上游节点处理完的数据,在发往下游之前也会放在一个网络缓冲区,这个可以叫做“输出缓冲区”(之前接收数据的可以叫做“输入缓冲区”);当下游处理速度慢、输入缓冲区满,会通知上游不再发数据,那么上游的输出缓冲区就会逐渐填满;当上游任务的输出缓冲区满,上游任务就停止处理数据开始等待。这样,上游不再处理数据,可还会不停地接收数据,那么它的输入缓冲区也会被填满,就再通知上游的上游不再发送数据…以此类推,最终将阻塞信息传递给Source任务,这样就不再读取数据了。一旦下游阻塞的任务处理完成,开始继续读进数据,那么这种“开放”也会逐级向上游传递。

而spark的反压只有一个缓冲区的概念。

另外flink还提供了一种叫做“信任度”(credit)的控制机制。简单来说就是,下游任务给上游任务定一个“信任度”,这代表了允许上游发送多少数据。上游任务要发送数据之前,先向下游申请信任度,表示“我要发这么多数据”;而下游任务会根据自己的情况赋给一定的信任度。当下游任务处理慢、输入缓冲区开始堆积数据、空间越来越小时,给上游的信任度就会减少,上游数据的传输就变慢了

【来自:技术-阿威】

两个task计算,上游会先传入缓存区中,下游计算完一个,缓存区会发送一个。 如果没有计算完,就不会有数据过来。

TCP的四种拥塞控制算法
1.慢开始
2.拥塞控制
3.快重传
4.快恢复

【来自:技术-昭】

flink把反压链路优化短了,本来tcp是逐级反压,只能一级缓存反馈到另一层缓存,一共有六个缓存,sockot,netty,以及算子的本地缓存以及tm共有的网络缓存,

现在优化成算子本地缓存阻塞以后,直接通知上游的另一个本地缓存

细分又分为tm间反压,以及tm内反压

【来自:技术-讷言敏行】
image.png

【附:链接】
背感压力,你对背压了解多少?

如何分析及处理反压?


2021-10-18 redis 穿透,击穿,雪崩


【技术-阿威】
1.缓存穿透:现象,在redis中查找一个一定不存在的数,在没找到数据时,会去数据库中查找。解决:将空对象也缓存起来,并设置5分钟的过期时间, 或者弄个布隆过滤器,将所有可能存在的数据hash在一个足够大的bitmap中,去缓存查数据前先去判断是否存在,减小redis压力。
2.缓存雪崩,现象:在一段时间内,查多条数据时,redis 多个数据过期,没有查到,去数据库查。解决:将数据过期时间设置的分散一点。
3.缓存击穿,现象:高并发查询同一条key数据,没有查到,去数据库查,类似一个保护罩被击穿。解决,在redis过期时间设置这个key为永不过期

flink实时项目中,redis做旁路缓存, 因为我们是把维度表放在hbase中,然后要join时,为了减少hbase的压力,增加性能,先放入redis 缓存

【技术-十二】
击穿是单个key失效,雪崩是大量key同时失效,导致数据库阻塞或宕机,穿透是查询缓存或数据库中一个不存在的数据,即使数据不存在也会去查询数据库,如果访问量巨大,可能会导致DB挂掉

【技术-贺伦登】
Sparkstreaming生产环境,做优雅关闭的时候用到了redis,做了一个k-v存储,可以手动关闭Dstream,还有存储Kafka的消费偏移量。


2021-10-20 SQL递归了解多少?


【技术-志明】
使用场景一次性查出所有菜单资源生成菜单列表,后来流行ajax后就改成异步加载菜单了,这种写法就比较少用了。
还有组织机构(部门)列表也用,主要是用于存在上下级关系(树形结构)的应用场景。

【技术-闭口禅】
树形展开,比如一个产品用到的原材料,一直展开

【】
WITH expression_name [ ( column_name [,…n] ) ]
AS ( CTE_query_definition ) —只有在查询定义中为所有结果列都提供了不同的名称时,列名称列表才是可选的。—运行 CTE 的语句为:
SELECT FROM expression_name;

  1. 公用表表达式的名字(在WITH关键字之后)
  2. 查询的列名(可选)
  3. 紧跟AS之后的SELECT语句(如果AS之后有多个对公用表的查询,则只有第一个查询有效

sql递归知多少


2021-10-21 连续3周活跃用户,7天连续3天活跃用户数等等用户类指标?


如何分析用户活跃?
在启动日志中统计不同设备id出现次数。去重

如何分析用户新增?
用活跃用户表left join 用户新增表,用户新增表中mid为空的即为用户新增。

如何分析用户1天留存?
留存用户=前一天新增 join 今天活跃
用户留存率=留存用户/前一天新增

如何分析沉默用户?
(登录时间为7天前,且只出现过一次)
按照设备id对日活表分组,登录次数为1,且是在一周前登录。

如何分析本周回流用户?
本周活跃left join本周新增 left join上周活跃,且本周新增id和上周活跃id都为null

如何分析流失用户?
(登录时间为7天前)
按照设备id对日活表分组,且七天内没有登录过。

如何分析最近连续3周活跃用户数?
按照设备id对周活进行分组,统计次数大于3次。

如何分析最近七天内连续三天活跃用户数?
1)查询出最近7天的活跃用户,并对用户活跃日期进行排名
2)计算用户活跃日期及排名之间的差值
3)对同用户及差值分组,统计差值个数
4)将差值相同个数大于等于3的数据取出,然后去重(去的是什么重???),即为连续3天及以上活跃的用户


2021-10-25 Hive 优化?你有什么经验之谈?


【技术-阿威】
行列过滤,mapjoin,combiner预聚合开启, 列式存储,增加maptask个数(根据控制切片大小),增加reducetask个数, 压缩(snappy), 更换计算引擎(spark, tez), 小文件解决(merge, jvm重用) ,设置分区

【技术-风间】
利用分区分通 where提前执行 尽量不用select * 严格模式防止笛卡尔积 group by改写

【】

Hive最全总结,学习与面试,收藏这一篇就够了!


2021-10-26 Flink分布式快照的原理是什么?


【技术-小七】
Chandy-Lamport算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点,
1.JobManager发出指令,触发检查点的保存
2.状态快照保存完毕后,在数据源后插入barrier并向下游传递
3.下游如果有多个并行的子任务就广播barrier,执行barrier对齐机制
4.barrier对齐后,保存状态到持久化状态后端
5. 先处理缓存数据,然后再正常继续处理数据(你保存状态,数据在不断的过来呀,先放到缓冲区里,等状态快照保存完毕后,先处理缓冲区的数据,再正常处理后面的数据)
6. JobManager收到所有任务的成功保存状态的信息后,认为当前检查点保存完毕

【】
Flink的容错机制的核心部分是制作分布式数据流和操作算子状态的一致性快照。 这些快照充当一致性checkpoint,系统可以在发生故障时回滚。 Flink用于制作这些快照的机制在“分布式数据流的轻量级异步快照”中进行了描述。 它受到分布式快照的标准Chandy-Lamport算法的启发,专门针对Flink的执行模型而定制。

barriers在数据流源处被注入并行数据流中。快照n的barriers被插入的位置(我们称之为Sn)是快照所包含的数据在数据源中最大位置。
例如,在Apache Kafka中,此位置将是分区中最后一条记录的偏移量。 将该位置Sn报告给checkpoint协调器(Flink的JobManager)。
然后barriers向下游流动。当一个中间操作算子从其所有输入流中收到快照n的barriers时,它会为快照n发出barriers进入其所有输出流中。
一旦sink操作算子(流式DAG的末端)从其所有输入流接收到barriers n,它就向checkpoint协调器确认快照n完成。
在所有sink确认快照后,意味快照着已完成。一旦完成快照n,job将永远不再向数据源请求Sn之前的记录,因为此时这些记录(及其后续记录)将已经通过整个数据流拓扑,也即是已经被处理结束。


2021-10-28 什么是CAP法则?大数据哪些框架用到了?


【技术:White in ink】
一致性,可用性,分区容错性,最多满足两个

CAP


2021-10-29 怎么选择压缩格式和文件存储格式?


【技术-otw】
压缩效率、解压缩效率、压缩率、压缩后是否能切分

存储格式现在主要用列式存储 orc 或 parquet,压缩格式主要用 Snappy。
但对于需要输入输出的文件建议用 gzip,因为可以直接查看内容

对于hive只能建表时侯设置。orc是为hive专门设计的存储压缩格式

paquet在spark里也是很常用的,hive里也能用。

【技术-陈】
要看执行的任务是什么类型,处于哪个阶段,之后要做什么操作,数据量大否。压缩格式主要考虑参数:1)压缩效率、解压缩效率、压缩率、压缩后是否能切分;

存储格式可能要参考之后表的数据操作(select 特定字段还是*)的频率等等

【技术-闭口禅】
压缩默认是关闭的,需要将压缩功能、mapreduce输出数据压缩打开,还可以设置压缩方式
image.png

image.png

image.png


2021-11-01 Flink跟Spark 的区别?


【土哥】
spark是微批处理,还是以批RDD为基础,没有实现真正的流批一体
flink 是以流为核心,真正实现流批一体功能

时间语义不同,spark 包含两种,flink包含3种

【技术-远方时光】
Flink相比传统的Spark Streaming区别?
Flink 是标准的实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型。
下面我们就分几个方面介绍两个框架的主要区别:

  1. 架构模型:Spark Streaming 在运行时的主要角色包括:Master、Worker、Driver、Executor,Flink 在运行时主要包含:Jobmanager、Taskmanager和Slot。
  2. 任务调度:Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图DAG,Spark Streaming 会依次创建 DStreamGraph、JobGenerator、JobScheduler。
    Flink 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会根据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最核心的数据结构,JobManager 根据 ExecutionGraph 对 Job 进行调度。
  3. 时间机制:Spark Streaming 支持的时间机制有限,只支持处理时间。 Flink 支持了流处理程序在时间上的三个定义:处理时间、事件时间、注入时间。同时也支持 watermark 机制来处理滞后数据。
  4. 容错机制:对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。Flink 则使用两阶段提交协议来解决这个问题。

2021-11-02 数据质量监控怎么做的?


参考:http://griffin.apache.org
监控原则
1)单表数据量监控
一张表的记录数在一个已知的范围内,或者上下浮动不会超过某个阈值
n SQL结果:var 数据量 = select count()from 表where 时间等过滤条件
n 报警触发条件设置:如果数据量不在[数值下限, 数值上限], 则触发报警
n 同比增加:如果((本周的数据量 - 上周的数据量)/上周的数据量
100)不在 [比例下线,比例上限],则触发报警
n 环比增加:如果((今天的数据量 - 昨天的数据量)/昨天的数据量100)不在 [比例下线,比例上限],则触发报警
n 报警触发条件设置一定要有。如果没有配置的阈值,不能做监控
日活、周活、月活、留存(日周月)、转化率(日、周、月)GMV(日、周、月)
复购率(日周月) 30%
2)单表空值检测
某个字段为空的记录数在一个范围内,或者占总量的百分比在某个阈值范围内
n 目标字段:选择要监控的字段,不能选“无”
n SQL结果:var 异常数据量 = select count(
) from 表where 目标字段 is null
n 单次检测:如果(异常数据量)不在[数值下限, 数值上限],则触发报警
3)单表重复值检测
一个或多个字段是否满足某些规则
n 目标字段:第一步先正常统计条数;select count() form 表;
n 第二步,去重统计;select count(
) from 表 group by 某个字段
n 第一步的值和第二步不的值做减法,看是否在上下线阀值之内
n 单次检测:如果(异常数据量)不在[数值下限, 数值上限], 则触发报警
4)单表值域检测
一个或多个字段没有重复记录
n 目标字段:选择要监控的字段,支持多选
n 检测规则:填写“目标字段”要满足的条件。其中$1表示第一个目标字段,$2表示第二个目标字段,以此类推。上图中的“检测规则”经过渲染后变为“delivery_fee = delivery_fee_base+delivery_fee_extra”
n 阈值配置与“空值检测”相同
5)跨表数据量对比
主要针对同步流程,监控两张表的数据量是否一致
n SQL结果:count(本表) - count(关联表)
n 阈值配置与“空值检测”相同


2021-11-03 你有什么面试技巧?


【来自尚硅谷学员分享】
(1)你的优点是什么?
大胆的说出自己各个方面的优势和特长
(2)你的缺点是什么?
不要谈自己真实问题;用“缺点”衬托自己的优点
(3)你的离职原因是什么?
Ø 不说前东家坏话,哪怕被伤过
Ø 合情合理合法
Ø 不要说超过1个以上的原因
(4)您对薪资的期望是多少?
Ø 非终面不深谈薪资
Ø 只说区间,不说具体数字
Ø 底线是不低于当前薪资
Ø 非要具体数字,区间取中间值,或者当前薪资的+20%
(5)您还有什么想问的问题?
Ø 这是体现个人眼界和层次的问题
Ø 问题本身不在于面试官想得到什么样的答案,而在于你跟别的应聘者的对比
Ø 标准答案:
公司希望我入职后的3-6个月内,给公司解决什么样的问题
公司(或者对这个部门)未来的战略规划是什么样子的?
以你现在对我的了解,您觉得我需要多长时间融入公司?
(6)您最快多长时间能入职?
一周左右,如果公司需要,可以适当提前。

两个注意事项
1)职业化的语言
2)职业化的形象

自我介绍(控制在4分半以内,不超过5分钟)
1)个人基本信息
2)工作履历
时间、公司名称、任职岗位、主要工作内容、工作业绩、离职原因
3)深度沟通(也叫压力面试)
刨根问底下沉式追问(注意是下沉式,而不是发散式的)
基本技巧:往自己熟悉的方向说


2021-11-04 项目中遇到的问题怎么解决的?(思路,方法,解决途径等等方面都可以说)


**【技术-陈】

**
一般业务问题找导师,技术自己先百度或者看文档,感觉进了业务组,技术不难,业务难。
觉得要先了解大概业务,再去看,不然很懵。如果是不了解那业务的话。

【技术-~】
看什么问题了,是数据出问题还是程序报错。数据出问题可以进行回溯,是代码写错造成的还是业务上或者其他系统提供的数据不对造成,并及时跟进。如果是程序报错那就百度一下,或者问同事,一般都能解决的

【】
首先要找到报错信息,要精准的找到
写代码时候,一定要加上日志,把进来的数据打印出来,解决问题的时候好还原

【以下问题,可以在心中想象下】
Hadoop宕机
Hadoop解决数据倾斜方法
集群资源分配参数(项目中遇到的问题)
HDFS小文件处理
Hadoop优化

Flume挂掉
Flume优化

Kafka挂掉
Kafka丢失
Kafka数据重复
Kafka消息数据积压
Kafka优化
Kafka单条日志传输大小

自定义UDF、UDTF函数
Hive优化
Hive解决数据倾斜方法
7天内连续3次活跃

Sqoop空值、一致性、数据倾斜

Azkaban任务挂了怎么办?
Azkaban故障报警

数据倾斜
精确一次性消费


2021-11-08 分布式事务你了解多少?


分布式事务的目标和实际应用场景
分布式事务的目标: 解决多个独立事务一致性的问题。
比如说我们有一个功能,订单系统,横跨了多个微服务,由于每个微服务不在一个库,没法用数据库事务来保证事务,那么这个时候我们就可以使用分布式事务
例如: 商城项目,有用户支付了一个订单,在支付系统中,支付表进行了更新,在另一个订单系统中,订单库里面订单的状态就要变成已支付,那么在订单表和支付表,他们在不同的库,如何保证两个数据库之间的事务呢
支付操作:支付修改余额,修改订单状态

分布式事务解决方案

  1. 二阶段提交协议(2PC)
  2. 三阶段提交协议(3PC)
  3. 补偿事务(TCC)
  4. 消息中间件实现
  5. seata框架

2021-11-09 SQL笔试?


样例数据:
id time uid is_suc res
1 2020-01-01 1 1 Null
2 2020-01-02 1 0 1
3 2020-01-03 1 0 1
4 2020-01-04 1 1 1
5 2020-01-05 1 0 4
6 2020-01-06 2 0 Null
7 2020-01-07 2 1 Null
8 2020-01-08 2 0 7

要求:
用sql算出res列,即同一个uid下,上一次is_succ=1 时的 id是多少?

【来自-技术-闭口禅】
image.png

【技术-@,】
68543c95b8abacd8f6d10965cd87c13.jpg


2021-11-11 你看源码的方式与经验有哪些?


【技术-zhr】
1.idea工具看源码的快捷键,功能得先了解,比如怎么看一个类的继承图,一个方法的具体实现怎么看等等

2.看源码的时候,先建立一个整体的逻辑,不要深究细节实现。把握好整体之后,再看细节实现就会清晰很多

【技术-绝域时空】
我看源码,事先先了解这个模块是干啥的,然后点进去,一个模块一个模块看

有包括初始化的相关设置,相应的函数设置还有为了确保这些函数实行的预防机制


2021-11-12 如何保证写的sql正确性?


【技术-绝域时空】
可以自己制造几个工程数据跑几遍,看看数据是不是按照自己想走的流程,最后看结果是否正确
【技术-天外来物】
看数据量是否发散,如果不发散找几条有代表性的数据验正确性
【技术-雪中剑】
边界测试用最多
然后看数据结果跟业务是否有异常


2021-11-15 watermark 的作用是啥?如何保证数据不丢失?


【技术-爱静静】

image.png
【技术-Sxc】
通过窗口把流数据分块处理,用Watermark确定什么时候不再等待更早的数据,和什么时候触发计算,用allowLateNess 指定允许迟到的最大时间,用sideOutPut 把迟到的数据放到侧输出流

WaterMark + EventTimeWindow + Allowed Lateness 方案(侧道输出),可以做到数据不丢失。

【技术-小七】
把窗口看成一个一个桶,每个桶都有时间范围,来的数据根据她的时间去自己相应的桶里,当某一个桶里的我数据的时间达到桶的界限,这个窗口就触发了计算
桶比较好理解,关闭,触发计算,迟到,乱序数据都很形象
image.png

【技术-浅望】
水位线是可以设置事件时间,窗口开始向下取整,如果下一条数据到来超过上一个窗口结束时间+延迟,就触发窗口计算关闭


2021-11-16 mr, tez,spark作为hive计算引擎各自优缺点?实际生产中怎么选择?


【技术-謎士】
MR 慢 但是总归是能执行完 spark 快 但是大数据量容易崩

【技术-故意】
1.tez和spark对hive的兼容性比较差 如果用开源的部署时容易出现jar包冲突问题
2.tez和spark比的话 如果资源充足就选spark

https://www.jianshu.com/p/357fceaa4042
MapReduce
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。
Tez
Tez是Apache开源的支持DAG作业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等,这样,这些分解后的元操作可以任意灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业。
Spark
Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。
Tez和Mapreduce区别
核心思想:MapReduce将一个算法抽象成Map和Reduce两个阶段进行处理;Tez将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等依赖DAG:Mapreduce没有DAG一说,Tez将map和reduce阶段拆分成多个阶段,分解后的元操作可以任意灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业落地磁盘:MapReduce会有多次落地磁盘;Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少。
Tez和Spark区别
使用场景:spark更像是一个通用的计算引擎,提供内存计算,实时流处理,机器学习等多种计算方式,适合迭代计算;tez作为一个框架工具,特定为hive和pig提供批量计算运行模式:spark属于内存计算,支持多种运行模式,可以跑在standalone,yarn上;而tez只能跑在yarn上;虽然spark与yarn兼容,但是spark不适合和其他yarn应用跑在一起资源利用:tez能够及时的释放资源,重用container,节省调度时间,对内存的资源要求率不高; 而spark如果存在迭代计算时,container一直占用资源;
mr引擎在hive 2中将被弃用。官方推荐使用tez或spark等引擎。
选择
tez:使用有向无环图。内存式计算。
spark:可以同时作为批式和流式的处理引擎,减少学习成本。
2、Hive引擎选择
MapReduce: 是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行处理,每个阶段都是用键值对(key/value)作为输入和输出,非常适合数据密集型计算。Map/Reduce通过把对数据集的大规模操作分发给网络上的每个节点实现可靠性;每个节点会周期性地返回它所完成的工作和最新的状态。如果一个节点在设定的时间内没有进行心跳上报,主节点(可以理解为主服务器)就会认为这个节点down掉了,此时就会把分配给这个节点的数据发到别的节点上运算,这样可以保证系统的高可用性和稳定性。因此它是一个很好的计算框架。
TEZ:是基于Hadoop YARN之上的DAG(有向无环图,Directed Acyclic Graph)计算框架。核心思想是将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等。这样,这些分解后的元操作可以任意灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业,从而可以减少Map/Reduce之间的文件存储,同时合理组合其子过程,也可以减少任务的运行时间。
Spark:Hive on Spark总体的设计思路是,尽可能重用Hive逻辑层面的功能;从生成物理计划开始,提供一整套针对Spark的实现,比如 SparkCompiler、SparkTask等,这样Hive的查询就可以作为Spark的任务来执行了
三者比较个人意见:
MR
计算,会对磁盘进行多次的读写操作,这样启动多轮job的代价略有些大,不仅占用资源,更耗费大量的时间 <–相比较–>
TEZ
计算,就会生成一个简洁的DAG作业,算子跑完不退出,下轮继续使用上一轮的算子,这样大大减少磁盘IO操作,从而计算速度更快。 TEZ比MR至少快5倍(约值,反正是快,不必较真0.0) <–相比较–>
Spark
计算,DAG生成,Stage划分,比MR快10倍(约值,反正是快,不必较真0.0)与TEZ相比我选择Spark,一来快,


2021-11-18 MR,Spark和Flink的Shuffle区别?


**【技术-浅望】

**
MR的shuffle

Map方法后,经过标记分区进入环形缓冲区,默认80%反向溢写快排。然后加载回到内存中归并,写入磁盘,等待reduce端拉取

Reduce端拉取指定分区数据到内存,内存不够溢写磁盘,再经过归并分组输出。
【技术-Sxc】

MR的Shuffle:从Map端输出到Reduce端输入的数据处理过程称为shuffle。
map端包含分区,排序,溢写磁盘,合并等步骤。将分区后的数据写入环形缓冲区Kvbuffer,Kvbuffer默认大小为100M,阈值为0.8,数据会在Kvbuffer中进行排序,当数据大小超过80M时,数据开始写入磁盘,每个分区中数据是有序的。由于缓冲区小,数据大,要进行多次排序,一次Kvbuffer中的排序过程至少生成一个spill.out文件,将所有的spill.out文件进行归并排序和压缩后写入磁盘

reduce端包含copy、sort、reduce。reduce将map输出的数据拷贝后进行处理,如果在内存中放得下这次数据的话就直接把数据写到内存中,如果写不下的话就启动merge,写入磁盘,最后进入reduce方法,对数据进行处理

https://zhuanlan.zhihu.com/p/70331869

什么是Spark Shuffle?
reduceByKey会将一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是对的形式,这样每一个key对应一个聚合起来的value。
问题:
每一个key对应的value不一定都是在一个partition中,也不太可能在同一个节点上,因为RDD是分布式的弹性的数据集,它的partition极有可能分布在各个节点上。
聚合形式:
如何聚合:
Shuffle Write:上一个stage的每一个map task就必须保证将自己处理的当前分区中的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。
Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚在同一个节点上去处理和聚合。
Spark中有两种Shuffle管理类型,HashShuffleManager和SortShuffleManager,Spark1.2之前是HashShuffleManager,Spark1.2引入SortShuffleManager,在Spark2.0+版本中已经将HashShuffleManager丢弃。

HashShuffleManager:
1).普通机制:M(map task的个数)R(reduce task的个数)
2).优化机制:C(core的个数)
R(Reduce的个数)
SortShuffleManager:
1).普通机制:2M
2).bypass机制,没有排序:2
M

图解Hash-Based Shuffle

image.png

但是Shuffle可能面临的问题的是:
1.小文件过多,耗时低效的IO操作
2.OOM,读写文件以及缓存过多

image.png

执行流程:
(a)每一个Map task会将结果写入到不同的buffer中,每个buffer的大小为32k,buffer起到数据缓存的作用。
(b)每个buffer文件最后对应一个磁盘小文件
(c)reduce task来拉取对应的磁盘小文件
总结:
(1)map task的计算结果会根据分区器(默认hashPartitioner)来决定写入到哪一个磁盘小文件中去。ReduceTask会去Map端拉取相应的磁盘小文件。
(2)产生的磁盘小文件的个数:
M(map task的个数)R(reduce task的个数)
存在的问题:
产生的磁盘小文件过多,会导致以下问题:
(a)在shuffle Write过程中会产生很多写磁盘小文件的对象
(b)在shuffle read过程中会产生很多读取磁盘小文件的对象
(c)在JVM堆内存中对象过多回来造成频繁的gc,gc还无法解决运行所需要的内存的话,就会OOM
(d)在数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一旦网络通信出现了故障会导致shuffle file cannot find 由于这个错误导致的task失败,TaskScheduler不负责重试,由DAGScheduler负责重试Stage。
合并机制:
*HashShuffleManager原理:

image.png

总结:
产生磁盘小文件的个数:C(core的个数)*R(reduce的个数)

SortShuffle运行原理:
SortShuffle的运行机制主要分成两种:

普通运行机制
bypass运行机制

SortShuffle两种运行机制的区别:
SortShuffleManager普通运行机制:

image.png

执行流程:
a) map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M
b) 在shuffle的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M时,比如现在内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。
c) 如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。
d) 在溢写之前内存结构中的数据会进行排序分区
e) 然后开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是1万条数据,
f) map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。
g) reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。

总结:
产生磁盘小文件的个数:2*M(map task的个数)

byPass运行机制

image.png

bypass运行机制的触发条件如下:
(1)shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold的参数值(默认是200)
(2)产生的磁盘的小文件为:2*M(map task的个数)

https://www.jianshu.com/p/bd5d33effb0a
stream在operator之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于operator的种类。
One-to-one:stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着map operator的subtask看到的元素的个数以及顺序跟source operator的subtask生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。
Redistributing:这种操作会改变数据的分区个数。每一个operator subtask依据所选择的transformation发送数据到不同的目标subtask。例如,keyBy() 基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。


2021-11-19 Left join 在MR的实现原理?


【大数据私房菜】
Hive中的Join可分为两种情况
Common Join(Reduce阶段完成join)
Map Join(Map阶段完成join)

Common Join

  1. 如果没开启hive.auto.convert.join=true或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,在Reduce阶段完成join。并且整个过程包含MapShuffleReduce阶段。

1Map阶段
读取表的数据,Map输出时候以 Join on 条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;
Map输出的 value 为 join 之后需要输出或者作为条件的列;同时在value中还会包含表的 Tag 信息,用于标明此value对应的表;按照key进行排序

2Shuffle阶段
根据key取哈希值,并将key/value按照哈希值分发到不同的reduce中
3Reduce阶段
根据key的值完成join操作,并且通过Tag来识别不同表中的数据。在合并过程中,把表编号扔掉

2021年度 - 图30


2021-11-22 Linux你在使用过程中发现哪些好用的命令与功能?


Linux(vi/vim)
一般模式

语法 功能描述
yy 复制光标当前一行
y数字y 复制一段(从第几行到第几行)
p 箭头移动到目的行粘贴
u 撤销上一步
dd 删除光标当前行
d数字d 删除光标(含)后多少行
x 删除一个字母,相当于del
X 删除一个字母,相当于Backspace
yw 复制一个词
dw 删除一个词
shift+^ 移动到行头
shift+$ 移动到行尾
1+shift+g 移动到页头,数字
shift+g 移动到页尾
数字N+shift+g 移动到目标行

编辑模式

按键 功能
i 当前光标前
a 当前光标后
o 当前光标行的下一行
I 光标所在行最前
A 光标所在行最后
O 当前光标行的上一行

指令模式

命令 功能
:w 保存
:q 退出
:! 强制执行
/要查找的词 n 查找下一个,N 往上查找
? 要查找的词 n是查找上一个,shift+n是往下查找
:set nu 显示行号
:set nonu 关闭行号

压缩和解压
1. gzip/gunzip 压缩
(1)只能压缩文件不能压缩目录
(2)不保留原来的文件
gzip压缩:gzip hello.txt
gunzip解压缩文件:gunzip hello.txt.gz

  1. zip/unzip 压缩
    可以压缩目录且保留源文件
    zip压缩(压缩 1.txt 和2.txt,压缩后的名称为mypackage.zip):zip hello.zip hello.txt world.txt
    unzip解压:unzip hello.zip
    unzip解压到指定目录:unzip hello.zip -d /opt

  2. tar 打包
    tar压缩多个文件:tar -zcvf hello.txt world.txt
    tar压缩目录:tar -zcvf hello.tar.gz opt/
    tar解压到当前目录:tar -zxvf hello.tar.gz
    tar解压到指定目录:tar -zxvf hello.tar.gz -C /opt

RPM
RPM查询命令:rpm -qa |grep firefox
RPM卸载命令:
rpm -e xxxxxx
rpm -e —nodeps xxxxxx(不检查依赖)
RPM安装命令:
rpm -ivh xxxxxx.rpm
rpm -ivh —nodeps fxxxxxx.rpm(—nodeps,不检测依赖进度)

选项 功能
-i -i=install,安装
-v -v=verbose,显示详细信息
-h -h=hash,进度条
—nodeps —nodeps,不检测依赖进度

2021-11-24 用kylin干什么?cube太多怎么处理?


Apache Kylin是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。

维度和度量
维度:即观察数据的角度。比如员工数据,可以从性别角度来分析,也可以更加细化,从入职时间或者地区的维度来观察。维度是一组离散的值,比如说性别中的男和女,或者时间维度上的每一个独立的日期。因此在统计时可以将维度值相同的记录聚合在一起,然后应用聚合函数做累加、平均、最大和最小值等聚合计算。
度量:即被聚合(观察)的统计值,也就是聚合运算的结果。比如说员工数据中不同性别员工的人数,又或者说在同一年入职的员工有多少。

1)Cube中的维度数量较多,且没有进行很好的Cuboid剪枝优化,导致Cuboid数量极多;
2)Cube中存在较高基数的维度,导致包含这类维度的每一个Cuboid占用的空间都很大,这些Cuboid累积造成整体Cube体积变大;
3)存在比较占用空间的度量,例如Count Distinct,因此需要在Cuboid的每一行中都为其保存一个较大的寄存器,最坏的情况将会导致Cuboid中每一行都有数十KB,从而造成整个Cube的体积变大;

使用聚合组
1)强制维度(Mandatory),如果一个维度被定义为强制维度,那么这个分组产生的所有Cuboid中每一个Cuboid都会包含该维度。每个分组中都可以有0个、1个或多个强制维度。如果根据这个分组的业务逻辑,则相关的查询一定会在过滤条件或分组条件中,因此可以在该分组中把该维度设置为强制维度。
2)层级维度(Hierarchy),每个层级包含两个或更多个维度。假设一个层级中包含D1,D2…Dn这n个维度,那么在该分组产生的任何Cuboid中, 这n个维度只会以(),(D1),(D1,D2)…(D1,D2…Dn)这n+1种形式中的一种出现。每个分组中可以有0个、1个或多个层级,不同的层级之间不应当有共享的维度。如果根据这个分组的业务逻辑,则多个维度直接存在层级关系,因此可以在该分组中把这些维度设置为层级维度。
3)联合维度(Joint),每个联合中包含两个或更多个维度,如果某些列形成一个联合,那么在该分组产生的任何Cuboid中,这些联合维度要么一起出现,要么都不出现。每个分组中可以有0个或多个联合,但是不同的联合之间不应当有共享的维度(否则它们可以合并成一个联合)。如果根据这个分组的业务逻辑,多个维度在查询中总是同时出现,则可以在该分组中把这些维度设置为联合维度。

聚合组的设计非常灵活,甚至可以用来描述一些极端的设计。假设我们的业务需求非常单一,只需要某些特定的Cuboid,那么可以创建多个聚合组,每个聚合组代表一个Cuboid。具体的方法是在聚合组中先包含某个Cuboid所需的所有维度,然后把这些维度都设置为强制维度。这样当前的聚合组就只能产生我们想要的那一个Cuboid了。
再比如,有的时候我们的Cube中有一些基数非常大的维度,如果不做特殊处理,它就会和其他的维度进行各种组合,从而产生一大堆包含它的Cuboid。包含高基数维度的Cuboid在行数和体积上往往非常庞大,这会导致整个Cube的膨胀率变大。如果根据业务需求知道这个高基数的维度只会与若干个维度(而不是所有维度)同时被查询到,那么就可以通过聚合组对这个高基数维度做一定的“隔离”。我们把这个高基数的维度放入一个单独的聚合组,再把所有可能会与这个高基数维度一起被查询到的其他维度也放进来。这样,这个高基数的维度就被“隔离”在一个聚合组中了,所有不会与它一起被查询到的维度都没有和它一起出现在任何一个分组中,因此也就不会有多余的Cuboid产生。这点也大大减少了包含该高基数维度的Cuboid的数量,可以有效地控制Cube的膨胀率。


2021-11-25 面试怎么优雅地吹牛皮?


这里就不在记录了


2021-11-26 请指出下面sql语句的区别?


  1. select a. from a left Join b on a.key = b.key and a.ds=xxx *and b.ds=xxx
  2. sclect a. from a lef Join b on a.key = b.key *and b.ds=xxx
  3. select a. from a lef Join b on a.key = b.key and b.ds=xxx *where a.ds=xxx
  4. Select a. from a left Join b on a.key = b.key where a.ds=xxx *and b.ds=xxx

【技术-~】
1.a表不会丢数据,a表符合条件的才参与关联,否则对应b的记录为null,选取b表符合条件的跟a表关联
2.a不会丢数据,选取b表符合条件的跟a关联,关键不上为null
3.选取b表符合条件的关联,在结果中对a表过滤
4.ab左连接,对结果的ab表记录都过滤

【技术-cfc】
1比2 多了个 筛选条件 a.ds=xxx 我认为1是先筛选了 a.ds=xxx and b.ds=xxx 再 进行 join

where 是 前面的 jion 执行完了 在条件筛选 +where 的速度 应该没有 不加 执行的快

【技术-春风化雨】
左关联的时候,where不能过滤右表,不然就变成强关联了,和join一样了

where左表可以减少数据量,除非是想要左表有,右表没有的,where b.id is null

【技术-倒走的鹿】

1.对于内连接inner join,两个表的谓词条件放在on与where后面相同。
2.对于left join:
左表谓词放在on后不会对左表数据进行过滤,依然显示左表全部数据,放在where后面才会对左表进行过滤
右表谓词不管放在on后还是where后都会对右表先过滤再连接,但是放在where后left join会转换为inner join。
3.对于外连接,谓词条件放的位置不同,结果集也不同,可以根据自己的需求斟酌使用。

【总结】
数据库在通过连接两张或多张表来返回记录时,都会生成一张中间的临时表。

on条件是在生成临时表时使用的条件,它不管on中的条件是否为真,都会返回左边表中的记录。

where条件是在临时表生成好后,再对临时表进行过滤的条件。条件不为真的就全部过滤掉。


2021-11-29 Yarn 资源调度器怎么选择?


【技术-十二】
有三种,FIFO Scheduler ,Capacity Scheduler,FairS cheduler,FIFO很简单,也不用配置,适合小项目,现在的应用一般都是比较大型的,会占更多的资源,会造成其他应用阻塞,所以现在已经很少用了

Capacity 中有一个队列来专门运行小文件,但是这样就会占一些集群的资源,导致大文件执行的时候时间就会变慢。Fair是最好用的一个调度器,不管大小都可以公平的共享集群资源

【技术-云哥】
核心调度策略不同,容量调度器的调度策略是,先选择资源利用率低的队列,然后在队列中同时考虑 FIFO 和内存因素;而公平调度器仅考虑公平,而公平是通过任务缺额体现的,调度器每次选择缺额最大的任务(队列的资源量,任务的优先级等仅用于计算任务缺额)。

具体选用哪种调度算法,可根据实际应用需求而定。一个基本的经验是,小型 Yarn 集群(100 个节点以内),可考虑使用公平调度器,而大型 Yarn 集群(超过 100 个节点)可采用容量调度器效果会更好。

【技术-贾彤】
Apache默认是容量调度器,CDH默认是公平调度器


2021-11-30 Redis跳表了解多少?


【技术-贾彤】
redis使用跳表存在前提是处理的数据链表必须是有序的,本质就是二分查找法,在原有的有序链表上面增加多级索引,通过索引来实现快速查找。不仅能提高搜索性能,同时也可以提高插入和删除操作的性能。

在每个节点中维持多个指向其他的几点指针,从而达到快速访问队尾目的

对于一个单链表来讲,即便链表中存储的数据是有序的,如果我们要想在其中查找某个数据,也只能从头到尾遍历链表

【Redis数据结构-跳跃表】
https://www.cnblogs.com/hunternet/p/11248192.html


2021-12-01 SQL题


image.png
CREATE TABLE default.test_tel (
id bigint ,
tel string,
id_new bigint,
tel_new string
) . . . . . .

insert into test_tel
values(
(1,’123’,2,’234’),
(2,’234’,3,’345’),
(3,’345’,4,’456’),
(5,’666’,7,’888’)
)

select aa.*,bb.id_new
from (
select id,tel from default.test_tel
union
select id_new,tel_new from default.test_tel
) aa
left join (
select
a.id_new,
ifnull(lag(a.id_new) over(order by id_new),0) id2
from default.test_tel a
where not exists (select 1 from default.test_tel b where a.id_new=b.id)
) bb on aa.id>bb.id2 and aa.id<=bb.id_new
order by aa.id


2021-12-02 你为什么要离职?


绝对不要说前东家不好,领导不好,任何负面评价,也不要体现出工作太累,加班多,尽量往自己职业规划上靠,想去一个更好团队,感兴趣的项目,或者个人成就感方向去说。

尽量不要说薪资太少之类的,这个后续可能会被压薪资。

话术例举
首先是因为今年一整年公司组织架构一直在调整,当前公司的业务方向及主要的项目已经和我定的职业规划有了较大的偏离,
其次是我在这家公司工作了三年,设计能力与经验都有较大的积累,所以我希望有更大,更专业的平台与更强的人做事,并且我个人会喜欢比较有挑战性一点的工作。


2021-12-03 Flink怎么实现广播变量?


【技术-博文】
datasteam.broadcast()

广播流会向每个下游通道都发送一份数据,这种类似于咱们的微信大群聊天,我发送的消息可以被群里440人都能看见,说明群里440人都接收到了我这条消息。

另外官网介绍了广播状态是纯使用内存,不适合广播特别大的数据量,即使你配置的是rocksdb,所有的广播状态依然只会存在于内存中

【】
environment.addSource(new xxxx())
.broadcast(xxx)


2021-12-06 Flink hbase connector 实操遇到什么困难问题?怎么解决的?


【依赖问题】
https://www.cnblogs.com/jiuyang/p/10782636.html

https://blog.csdn.net/TreeCode/article/details/120185674
【插入数据过慢,出现背压】
批量
https://blog.csdn.net/Liu_RS/article/details/104634731

随后网友遇到其他问题,可以提给我,随时补充


2021-12-07 HDFS 中10台datanode,三个副本,允许挂几台datanode不影响读?允许挂几台不影响写?


【写数据】
安全模式是hdfs所处的一种特殊状态,在这种状态下,文件系统只接受读数据的请求,而不接受删除修改等变更请求。

DataNode在启动的时候会向namenode汇报可用的block等状态,在整个系统达到安全标准时(最小副本数不小于99.9%,小于则一直处于安全模式)

假设我们设置的副本数(即参数dfs.replication)是3,那么在DataNode上就应该有三个副本,假设只有两个副本,那么副本率=2/3=0.67,hdfs默认副本率(有多少比例的数据块满足最小副本数要求)是0.99.不符合,所以会自动复制副本到其他的DataNode,使得最小副本率不小于0.99
设置副本率: dfs.namenode.safemode.threshold-pct 0.99f(指定应有多少比例的数据块满足最小副本数要求)

所以:对于能写数据,目前考虑,最少需要三台DataNode(还有别的影响再考虑)
要离开安全模式,需要满足以下条件:
(1)达到副本数量要求的block比例满足要求
设置副本率: dfs.namenode.safemode.threshold-pct 0.99f
(2)可用的datanode节点数满足配置的数量要求
最小存活的DataNode数,默认0: dfs.namenode.safemode.min.datanodes:0
(3) 1、2 两个条件满足后维持的时间达到配置的要求。
维持时间,单位毫秒: dfs.namenode.safemode.extension:30000

【读数据】
如果数据块正好在挂掉的DataNode上,那直接报错。(个人看法,后待各位补充)


2021-12-08 Kafka消费方式?


一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个partition由哪个consumer来消费 Kafka有三种分配策略,一是RoundRobin,一是Range。高版本还有一个StickyAssignor策略 将分区的所有权从一个消费者移到另一个消费者称为重新平衡(rebalance)。当以下事件发生时,Kafka 将会进行一次分区分配:
同一个 Consumer Group 内新增消费者。
消费者离开当前所属的Consumer Group,包括shuts down或crashes。
Range分区分配策略
Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个分区。假如有10个分区,3个消费者线程,把分区按照序号排列
0,1,2,3,4,5,6,7,8,9
消费者线程为
C1-0,C2-0,C2-1
那么用partition数除以消费者线程的总数来决定每个消费者线程消费几个partition,如果除不尽,前面几个消费者将会多消费一个分区。在我们的例子里面,我们有10个分区,3个消费者线程,10/3 = 3,而且除除不尽,那么消费者线程C1-0将会多消费一个分区,所以最后分区分配的结果看起来是这样的:
C1-0:0,1,2,3
C2-0:4,5,6
C2-1:7,8,9
如果有11个分区将会是:
C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10
假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:
C1-0:T1(0,1,2,3) T2(0,1,2,3)
C2-0:T1(4,5,6) T2(4,5,6)
C2-1:T1(7,8,9) T2(7,8,9)
RoundRobinAssignor分区分配策略
RoundRobinAssignor策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者. 使用RoundRobin策略有两个前提条件必须满足:
同一个消费者组里面的所有消费者的num.streams(消费者消费线程数)必须相等;每个消费者订阅的主题必须相同。加入按照 hashCode 排序完的topic-partitions组依次为
T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9
我们的消费者线程排序为
C1-0, C1-1, C2-0, C2-1
最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区
C1-1 将消费 T1-3, T1-1, T1-9 分区
C2-0 将消费 T1-0, T1-4 分区
C2-1 将消费 T1-8, T1-7 分区
StickyAssignor分区分配策略
Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个 分区的分配尽可能的与上次分配的保持相同。当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目的,StickyAssignor策略的具体实现要比RangeAssignor和RoundRobinAssignor这两种分配策略要复杂很多。
假设消费组内有3个消费者
C0、C1、C2
它们都订阅了4个主题:
t0、t1、t2、t3
并且每个主题有2个分区,也就是说整个消费组订阅了
t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1这8个分区
最终的分配结果如下:
消费者C0:t0p0、t1p1、t3p0
消费者C1:t0p1、t2p0、t3p1
消费者C2:t1p0、t2p1
这样初看上去似乎与采用RoundRobinAssignor策略所分配的结果相同
此时假设消费者C1脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用RoundRobinAssignor策略,那么此时的分配结果如下:
消费者C0:t0p0、t1p0、t2p0、t3p0
消费者C2:t0p1、t1p1、t2p1、t3p1
如分配结果所示,RoundRobinAssignor策略会按照消费者C0和C2进行重新轮询分配。而如果此时使用的是StickyAssignor策略,那么分配结果为:
消费者C0:t0p0、t1p1、t3p0、t2p0
消费者C2:t1p0、t2p1、t0p1、t3p1
可以看到分配结果中保留了上一次分配中对于消费者C0和C2的所有分配结果,并将原来消费者C1的“负担”分配给了剩余的两个消费者C0和C2,最终C0和C2的分配还保持了均衡。
如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。StickyAssignor策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。
到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。
举例,同样消费组内有3个消费者:
C0、C1、C2
集群中有3个主题:
t0、t1、t2
这3个主题分别有
1、2、3个分区
也就是说集群中有
t0p0、t1p0、t1p1、t2p0、t2p1、t2p2这6个分区
消费者C0订阅了主题t0
消费者C1订阅了主题t0和t1
消费者C2订阅了主题t0、t1和t2
如果此时采用RoundRobinAssignor策略:
消费者C0:t0p0
消费者C1:t1p0
消费者C2:t1p1、t2p0、t2p1、t2p2
如果此时采用的是StickyAssignor策略:
消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2
此时消费者C0脱离了消费组,那么RoundRobinAssignor策略的分配结果为:
消费者C1:t0p0、t1p1
消费者C2:t1p0、t2p0、t2p1、t2p2
StickyAssignor策略,那么分配结果为:
消费者C1:t1p0、t1p1、t0p0
消费者C2:t2p0、t2p1、t2p2
可以看到StickyAssignor策略保留了消费者C1和C2中原有的5个分区的分配:
t1p0、t1p1、t2p0、t2p1、t2p2。
从结果上看StickyAssignor策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。


2021-12-09 SQL题(脉脉笔试)


9851ee9e229b90728d81b79489324b1.jpg
以下仅供参考

【技术-Sxc】

1、9月 每日活跃用户数
select date_format(d,’yyyy-MM’) Month,count(*) from maimai.dau
group by date_format(d,’yyyy-MM’) where date_format(d,’yyyy-MM’) = ‘2020-09’

2、九月中 对于每日活跃用户,其当日活跃时长最大的模块
select d,uid,module,rk from
(select d,uid,module,rank() over(partition by d ,uid order by active_duration desc) as rk
from maimai.dau where d between ‘2020-09-01’ and ‘2020-09-30’)t1
where rk = 1

3、十一期间活跃用户中,jobs模块活跃超过100s的用户
select uid from maimai.dau where d between ‘2020-10-01’ and ‘2020-10-07’
and module = ‘jobs’ and active_duration > 100s

1、昨日的活跃用户中,各个级别人才,当日最大活跃时长模块的活跃时长
select u.career_level,d.module max(sum_duration) max_duration
from (select u.career_level,d.module sum(d.active_duration) sum_duration
from maimai.dau d join maimai.users u on d.uid = u.uid
group by u.career_level , d.module #按照级别和模块分组,对active_duration求和 , 取max
)t1 group by u.career_level,d.module

2、过去一周内,北京用户的活跃时长的均值
select u.city,avg(active_duration) avg_duration from maimai.dau da
join maimai.users u on d.uid = u.uid group by u.city ,
where u.city = ‘北京’ and da.d > date_sub(da.d , interval 7 day)

1、在过去一个月内,曾连续两天活跃的用户
select uid,flag,count(*) ct from
(select uid,d,rk,d-rk flagfrom(
select d,uid,row_number() over(partition by uid order by d) rk
from maimai.dau where d>date_sub(d,interval 30 day)
)t1)t2 group by uid,flag having ct >=2

【技术-完美答案】

  1. select d,count(*) from dau where d between ‘2020-09-01’ and ‘2020-09-30’ group by d;
  2. select d, module, dense_rank() over (partition by module order by activie_duration desc ) as ‘rank’ from dau where d between ‘2020-09-01’ and ‘2020-09-30’ and rank =1;
  3. select uid from dau where d between ‘2020-10-01’ and ‘2020-10-07’ and module = ‘jobs’ and active_duration > 100;

【技术-nor】

—中级题一
select u.career_level,avg(a2.rate1)from (
select uid,rate1 from (
select a.uid,a.module ,a.active_duration,row_number() over(partition by a.uid order by a.active_duration desc) rn
,a.active_duration/sum(a.active_duration) over(partition by a.uid) rate1
from maimal.dua a inner join (
select distinct uid from maimal.dua where d = ‘${yesterday}’) b on a.uid = b.uid
where a.d= ‘${today}’ )a1 where rn =1) a2 left join maimal.users u on a2.uid= u.uid
group by u.career_level ;

—中级题二
select avg(a.s1) from (select uid ,sum(active_duration) s1from maimal.dau
where d >= date_sub(next_day(‘${today}’,’MO’),-14) and d group by uid) a inner join maimal.users u on a.uid = u.uid and u.city = ‘北京’;

—高一
select distinct uid from (
select uid,count() over(partition by uid,c1) c1
from (
select uid,cast(substr(cast(d as string ),9,10) as int)+1-rank_dense() over(partition by uid order by d ) as rd
from maimal.dau where between ‘2020-09-01’ and ‘2021-09-30’) a ) b where c1 = 2;


2021-12-10 Doris用过吗?相比其他OLAP引擎怎么样?


https://tech.meituan.com/2020/04/09/doris-in-meituan-waimai.html
目前开源的比较受关注的OLAP引擎很多,比如Greenplum、Apache Impala、Presto、Doris、ClickHouse、Druid、TiDB等等,但缺乏实践案例的介绍,所以我们也没有太多的经验可以借鉴。于是,我们就结合自身业务的需求,从引擎建设成本出发,并立足于公司技术生态融合、集成、易用性等维度进行综合考虑,作为选型依据,最终我们平台部门选择了2018年刚进入Apache社区的Doris。
image.png
Doris的特点:

  • 同时支持高并发点查询和高吞吐的Ad-hoc查询。
  • 同时支持离线批量导入和实时数据导入。
  • 同时支持明细和聚合查询。
  • 兼容MySQL协议和标准SQL。
  • 支持Rollup Table和Rollup Table的智能查询路由。
  • 支持较好的多表Join策略和灵活的表达式查询。
  • 支持Schema在线变更。
  • 支持Range和Hash二级分区。

【官网:https://doris.apache.org/master/zh-CN/installing/compilation.html


2021-12-13 为什么 Kafka 不支持读写分离?


【技术-毛毛鸦】
1.主写从读是为了减轻leader节点压力,kafka的分区已经让读是从多个broker读从而负载均衡,如果允许主写从读,Kafka 副本机制使用的是异步消息拉取,可能会存在数据一致性的问题。
2.kafka使用场景并于属于读多写少的场景,读多写少更合适采用读写分离,kafka更多的是生产和消费消息,因此读写分离不太合适。

【】
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:

  1. 数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
  2. 延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络 → 主节点内存 → 网络 → 从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络 → 主节点内存 → 主节点磁盘 → 网络 → 从节 点内存 → 从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

而 kafka 的主写主读的优点就很多了:

  1. 可以简化代码的实现逻辑,减少出错的可能;
  2. 将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;
  3. 没有延时的影响;
  4. 在副本稳定的情况下,不会出现数据不一致的情况。

2021-12-20 jvm有没有调参的经验?


【技术-…】
GC一般要考虑的指标是
吞吐量: 应用耗时和实际耗时的比值;
停顿时间: 垃圾回收的时候,由于Stop the World,应用程序的所有线程会挂起,造成应用停顿。

然后根据实际场景 设置新生代 老年代的 堆内存大小

比如说 大对象产生比较多, 由于内存担保机制:大对象可能会直接到老年代,导致老年代回收时间较长, 就要把堆设置大一点

如果 一味的设置大内存 那么如果fgc停顿时间 就会很长,所以要按需调优。

https://blog.csdn.net/weixin_38106322/article/details/121147013
一.何时需要调优
堆中老年代内存持续上涨,达到设置的最大内存上限
Full GC次数频繁,导致STW次数过多,影响系统性能
GC停顿时间过长,超过1秒就得注意了
应用直接报出OutOfMemory 等内存异常提示
应⽤中有使⽤本地缓存且该缓存占⽤了大量的内存空间
参考监控数据,发现系统吞吐量与响应性能不高甚至出现下降情况

二.调优原则
大多数的java应用不需要在服务器上进行jvm优化
要深知多数出现GC问题的java应用,可能不是因为我们jvm参数设置错误,反而可能是代码问题引起的
尽量减少创建的对象数量
减少使用全局变量和大对象
要明白jvm优化是最后不得已才会使用的手段,我们更应该根据GC情况结合工具,分析出代码问题,优先优化代码
在实际情况中,通过分析GC情况来优化代码反而比直接优化JVM参数更好

三.调优目标
减少GC频率
减少GC时,STW的时间
低内存占用
高吞吐量

四.调优步骤
先分析GC日志以及dump文件,分析dump有很多工具可供使用,然后找到问题点,决定是否要优化
确定jvm调优量化目标
以旧的jvm参数模版为依据,调整jvm调优参数
调优一台服务器,对于调优前后的差异
然后通过命令行查看GC输出结果,不断调整尝试,找到合适的jvm参数配置
将目前调整到合适的参数应用到所有服务器上,跟踪之后的情况,看下是否还需要再次调整


2021-12-22 Flink 分区策略?


2021年度 - 图34


2021-12-23 Yarn底层原理,执行流程与相关算法?


YARN的基本架构
2021年度 - 图35
从YARN的架构图来看,它主要由ResourceManager和NodeManager、ApplicationMaster和Container等组件组成。
ResourceManager(RM)
YARN分层结构的本质是ResourceManager。这个实体控制整个集群并管理应用程序向基础计算资源的分配。ResourceManager 将各个资源部分(计算、内存、带宽等)精心安排给基础NodeManager(YARN 的每节点代理)。ResourceManager还与 ApplicationMaster 一起分配资源,与NodeManager 一起启动和监视它们的基础应用程序。在此上下文中,ApplicationMaster 承担了以前的 TaskTracker 的一些角色,ResourceManager 承担了 JobTracker 的角色。
1)处理客户端请求;
2)启动或监控ApplicationMaster;
3)监控NodeManager;
4)资源的分配与调度。
NodeManager(NM)
NodeManager管理一个YARN集群中的每个节点。NodeManager提供针对集群中每个节点的服务,从监督对一个容器的终生管理到监视资源和跟踪节点健康。MRv1通过插槽管理Map和Reduce任务的执行,而NodeManager 管理抽象容器(container),这些容器代表着可供一个特定应用程序使用的针对每个节点的资源。YARN继续使用HDFS层。它的主要 NameNode用于元数据服务,而DataNode用于分散在一个集群中的复制存储服务。**

NodeManager是驻留在一个YARN集群中的每个节点上的代理,主要负责:

  • 容器生命周期管理。
  • 监控每个容器的资源(CPU、内存等)使用情况。
  • 跟踪节点健康状况。
  • 以“心跳”的方式与ResourceManager保持通信。
  • 向ResourceManager汇报作业的资源使用情况和每个容器的运行状态。
  • 接收来自ApplicationMaster的启动/停止容器的各种请求 。

需要说明的是,NodeManager主要负责管理抽象的容器(container),只处理与容器相关的事情,而不具体负责每个任务(Map任务或Reduce任务)自身状态的管理,因为这些管理工作是由ApplicationMaster完成的,ApplicationMaster会通过不断与NodeManager通信来掌握各个任务的执行状态。
ApplicationMaster(AM)
ApplicationMaster管理一个在YARN内运行的应用程序的每个实例。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配)。请注意,尽管目前的资源更加传统(CPU 核心、内存),但未来会带来基于手头任务的新资源类型(比如图形处理单元或专用处理设备)。从 YARN 角度讲,ApplicationMaster 是用户代码,因此存在潜在的安全问题。YARN 假设 ApplicationMaster 存在错误或者甚至是恶意的,因此将它们当作无特权的代码对待。**

1)负责数据的切分;**

2)为应用程序申请资源并分配给内部的任务;
3)任务的监控与容错。
Container
对任务运行环境进行抽象,封装CPU、内存等多维度的资源以及环境变量、启动命令等任务运行相关的信息。比如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container且该任务只能使用该Container中描述的资源。
要使用一个YARN集群,首先需要来自包含一个应用程序的客户的请求。ResourceManager 协商一个容器的必要资源,启动一个ApplicationMaster 来表示已提交的应用程序。通过使用一个资源请求协议,ApplicationMaster协商每个节点上供应用程序使用的资源容器。执行应用程序时,ApplicationMaster 监视容器直到完成。当应用程序完成时,ApplicationMaster 从 ResourceManager 注销其容器,执行周期就完成了。
YARN的原理
YARN 的作业运行,主要由以下几个步骤组成:
2021年度 - 图36
1)作业提交
client调用job.waitForCompletion方法,向整个集群提交MapReduce作业 (第1步) 。 新的作业ID(应用ID)由资源管理器分配(第2步). 作业的client核实作业的输出, 计算输入的split,将作业的资源(包括Jar包, 配置文件, split信息)拷贝给HDFS(第3步). 最后, 通过调用资源管理器的submitApplication()来提交作业(第4步).
2)作业初始化
当资源管理器收到submitApplication()的请求时, 就将该请求发给调度器(scheduler), 调度器分配container, 然后资源管理器在该container内启动应用管理器进程, 由节点管理器监控(第5a和5b步)。
MapReduce作业的应用管理器是一个主类为MRAppMaster的Java应用。其通过创造一些bookkeeping对象来监控作业的进度, 得到任务的进度和完成报告(第6步)。然后其通过分布式文件系统得到由客户端计算好的输入split(第7步)。然后为每个输入split创建一个map任务, 根据mapreduce.job.reduces创建reduce任务对象。
3)任务分配
如果作业很小,应用管理器会选择在其自己的JVM中运行任务。如果不是小作业, 那么应用管理器向资源管理器请求container来运行所有的map和reduce任务(第8步). 这些请求是通过心跳来传输的, 包括每个map任务的数据位置, 比如存放输入split的主机名和机架(rack). 调度器利用这些信息来调度任务, 尽量将任务分配给存储数据的节点, 或者退而分配给和存放输入split的节点相同机架的节点.
4)任务运行
当一个任务由资源管理器的调度分配给一个container后, 应用管理器通过联系节点管理器来启动container(第9a步和9b步). 任务由一个主类为YarnChild的Java应用执行. 在运行任务之前首先本地化任务需要的资源, 比如作业配置, JAR文件, 以及分布式缓存的所有文件(第10步). 最后, 运行map或reduce任务(第11步).
YarnChild运行在一个专用的JVM中, 但是YARN不支持JVM重用.
5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器,客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新,展示给用户。
6)作业完成
除了向应用管理器请求作业进度外,客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion. pollinterval来设置。作业完成之后, 应用管理器和container会清理工作状态, OutputCommiter的作业清理方法也会被调用。作业的信息会被作业历史服务器存储以备之后用户核查。

算法
DRF是一种通用的多资源 最大最小公平分配策略(Max-Min Fairness Strategy),其核心思想是在多环境下一个用户的资源分配应该由用户的 主导份额的资源决定。主导份额的资源是在所有已经分配给用户的多种资源中,占据最大份额的一种资源。简而言之,DRF试图最大化所有用户中最小的主导份额。

下面我们看一个实例。假设系统中共有9个CPU和18GB RAM,有两个用户(或者框架)分别运行了两种任务,需要的资源量分别为<1CPU,4GB>和<3CPU,1GB>。对于用户A,每个任务要消耗总CPU的1/9(份额)和总内存的2/9,因而A的主资源为内存;对于用户B,每个任务要消耗总CPU的1/3和总内存的1/18,因而B的主资源为CPU。DRF将最大化所有用户的主资源,具体分配过程如表6-1所示。最终,A获取的资源量为<3CPU,12GB>,可运行3个任务;而B获取的资源量为<6CPU,2GB>,可运行2个任务。
image.png


2021-12-24 Flink 状态编程了解多少?


算子状态的数据结构(non-keyed state)
1、列表状态(list state)
将状态表示成一组数据的列表

2、联合列表状态(union list state)
将状态表示成一组数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)进行启动应用程序时如何恢复

3、广播状态(broadcast state)
如果一个算子有多个子任务,而他的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态

键控状态(keyed State)
flink为每个key维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key的对应状态;

当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key
[

](https://blog.csdn.net/shufangreal/article/details/104686064)

键控状态的数据结构
1、值状态(value state)
将状态表示单个的值,通过getState()获取状态值

2、列表状态(list state)
将状态表示为一组数据的列表,通过getListState()获取状态值

3、映射状态(map state)
将状态表示成一组key-value对,通过getMapState()获取状态值

4、聚合状态(reducing state & aggregating state)
将状态表示为一个用于聚合操作的列表


2021-12-27 Kafka 为什么可以这么快?


  1. Kafka本身是分布式集群,同时采用分区技术,并发度高。
  2. 顺序写磁盘

Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。

  1. 零拷贝技术

零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。
2021年度 - 图38
传统IO流程:
第一次:将磁盘文件,读取到操作系统内核缓冲区。
第二次:将内核缓冲区的数据,copy到application应用程序的buffer。
第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区)
第四次:将socket buffer的数据,copy到网卡,由网卡进行网络传输。
传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的。实际IO读写,需要进行IO中断,需要CPU响应中断(带来上下文切换),尽管后来引入DMA来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。
重新思考传统IO方式,会注意到实际上并不需要第二个和第三个数据副本。应用程序除了缓存数据并将其传输回套接字缓冲区之外什么都不做。相反,数据可以直接从读缓冲区传输到套接字缓冲区。
显然,第二次和第三次数据copy 其实在这种场景下没有什么帮助反而带来开销,这也正是零拷贝出现的意义。
所以零拷贝是指读取磁盘文件后,不需要做其他处理,直接用网络发送出去。

我的理解 Linux体系架构分用户态和内核态 这两部分有各自的权限等级,底层硬件相关的调用需要内核来完成,但平时我们运行的程序在用户态空间,所以涉及到底层硬件调用要切到内核空间执行,执行结束再切回用户空间继续跑,切换的过程涉及到数据复制且都是由cpu完成的开销比较大。零拷贝技术就是为了减少或避免调用底层硬件时cpu需要将数据从某处内存复制到另一个区域,减少内核和用户态的切换进而提高效率


2021-12-28 Flink重启策略?


【技术-普敬】
1.固定延迟重启策略
固定延迟重启策略是尝试给定次数重新启动作业。如果超过最大尝试次数,则作业失败。在两次连续重启尝试之间,会有一个固定的延迟等待时间。
通过在flink-conf.yaml中配置参数:
# fixed-delay:固定延迟策略
restart-strategy: fixed-delay
# 尝试5次,默认Integer.MAX_VALUE
restart-strategy.fixed-delay.attempts: 5
# 设置延迟时间10s,默认为 akka.ask.timeout时间
restart-strategy.fixed-delay.delay: 10s
2.故障率重启策略
故障率重启策略在故障后重新作业,当设置的故障率(failure rate)超过每个时间间隔的故障时,作业最终失败。在两次连续重启尝试之间,重启策略延迟等待一段时间。
在flink-conf.yaml文件配置
# 设置重启策略为failure-rate
restart-strategy: failure-rate
# 失败作业之前的给定时间间隔内的最大重启次数,默认1
restart-strategy.failure-rate.max-failures-per-interval: 3
# 测量故障率的时间间隔。默认1min
restart-strategy.failure-rate.failure-rate-interval: 5min
# 两次连续重启尝试之间的延迟,默认akka.ask.timeout时间
restart-strategy.failure-rate.delay: 10s
在代码中设置:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 3为最大失败次数;5min为测量的故障时间;10s为2次间的延迟时间
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));
3.无重启策略
作业直接失败,不尝试重启。
在flink-conf.yaml中配置:
restart-strategy: none
在代码中实现:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
4.后备重启策略
使用群集定义的重新启动策略。这对于启用检查点的流式传输程序很有帮助。默认情况下,如果没有定义其他重启策略,则选择固定延迟重启策略。

【技术-贾彤】
Flink 支持不同的重启策略,这些重启策略控制着 job 失败后如何重启,默认重启策略是通过Flink的配置文件设置的flink-conf.yaml,配置参数restart-strategy定义采用的策略。
固定延迟重启策略: 固定延迟重启策略会尝试一个给定的次数来重启 Job,如果超过了最大的重启次数,Job 最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 5表示最大重试次数为5次,10s为延迟时间 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.of(10, TimeUnit.SECONDS)));
失败率重启策略:失败率重启策略在 Job 失败后会重启,但是超过失败率后,Job 会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 3为最大失败次数;5min为测量的故障时间;10s为2次间的延迟时间 env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));
无重启策略:Job 直接失败,不会尝试进行重启。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart());


2021-12-29 码农搬砖人的未来在哪里?


2021-12-30 职场上的细节怎么去把握(加薪,升职,转业. . . . . ),你怎么看?


【技术-十二】
1.在其位谋其政
2.给多少钱干多少事儿
3.努力提升自己,趁年轻,多考几个证,技多不压身
4.没有挑战的工作趁早放弃,容易磨人意志,让人颓废
5.尝试上下级搞好关系,多去找领导提加薪,不提永远不会考虑你
6.业务和技术两路,喜欢哪个就去干,犹豫不决只会错失良机
7.想转业不分时间,路有千万条,可能不擅长计算机行业,那就多去看看别的行业,总有自己适合的
8.一定要攒点钱,以备不时之需
9.职场如战场,不要乱吐槽,有些人可能不是真的人


2021-12-31 对2021的完成了什么目标和对2022有什么想做是事情?