尚硅谷大数据技术之Hadoop(优化&新特性)
(作者:尚硅谷大数据研发部)

版本:V3.0

第1章 Hadoop数据压缩

1.1 概述

1.2 MR支持的压缩编码

压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示。

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

http://google.github.io/snappy/
On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.

1.3 压缩方式选择

1.3.1 Gzip压缩

1.3.2 Bzip2压缩

1.3.3 Lzo压缩

1.3.4 Snappy压缩

1.4 压缩位置选择

压缩可以在MapReduce作用的任意阶段启用。

图 MapReduce数据压缩

1.5 压缩参数配置

要在Hadoop中启用压缩,可以配置如下参数:

参数 默认值 阶段 建议
io.compression.codecs
(在core-site.xml中配置)
org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec 输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress(在mapred-site.xml中配置) false mapper输出 这个参数设为true启用压缩
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec mapper输出 企业多使用LZO或Snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) false reducer输出 这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec reducer输出 使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) RECORD reducer输出 SequenceFile输出使用的压缩类型:NONE和BLOCK

1.6 压缩实操案例

1.6.1 数据流的压缩和解压缩

测试一下如下压缩方式:

DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec

package com.atguigu.mapreduce.compress;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class TestCompress {

  1. public static void main(String[] args) throws Exception {<br /> compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");<br />// decompress("e:/hello.txt.bz2");<br /> }
  2. // 1、压缩<br /> private static void compress(String filename, String method) throws Exception {
  3. // (1)获取输入流<br /> FileInputStream fis = new FileInputStream(new File(filename));
  4. Class codecClass = Class.forName(method);
  5. CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
  6. // (2)获取输出流<br /> FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));<br /> CompressionOutputStream cos = codec.createOutputStream(fos);
  7. // (3)流的对拷<br /> IOUtils.copyBytes(fis, cos, 1024*1024*5, false);

// (4)关闭资源
cos.close();
fos.close();
fis.close();
}

  1. // 2、解压缩<br /> private static void decompress(String filename) throws FileNotFoundException, IOException {
  2. // (0)校验是否能解压缩<br /> CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
  3. CompressionCodec codec = factory.getCodec(new Path(filename));
  4. if (codec == null) {<br /> System.out.println("cannot find codec for file " + filename);<br /> return;<br /> }
  5. // (1)获取输入流<br /> CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
  6. // (2)获取输出流<br /> FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
  7. // (3)流的对拷<br /> IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
  8. // (4)关闭资源<br /> cis.close();<br /> fos.close();<br /> }<br />}

1.6.2 Map输出端采用压缩

即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置。
1)给大家提供的Hadoop源码支持的压缩格式有:BZip2Codec 、DefaultCodec
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

  1. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  2. Configuration configuration = new Configuration();
  3. // 开启map端输出压缩<br /> configuration.setBoolean("mapreduce.map.output.compress", true);<br /> // 设置map端输出压缩方式<br /> configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
  4. Job job = Job.getInstance(configuration);
  5. job.setJarByClass(WordCountDriver.class);
  6. job.setMapperClass(WordCountMapper.class);<br /> job.setReducerClass(WordCountReducer.class);
  7. job.setMapOutputKeyClass(Text.class);<br /> job.setMapOutputValueClass(IntWritable.class);
  8. job.setOutputKeyClass(Text.class);<br /> job.setOutputValueClass(IntWritable.class);
  9. FileInputFormat.setInputPaths(job, new Path(args[0]));<br /> FileOutputFormat.setOutputPath(job, new Path(args[1]));
  10. boolean result = job.waitForCompletion(true);
  11. System.exit(result ? 1 : 0);<br /> }<br />}<br />2Mapper保持不变<br />package com.atguigu.mapreduce.compress;<br />import java.io.IOException;<br />import org.apache.hadoop.io.IntWritable;<br />import org.apache.hadoop.io.LongWritable;<br />import org.apache.hadoop.io.Text;<br />import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper{

Text k = new Text();
IntWritable v = new IntWritable(1);

  1. @Override<br /> protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
  2. // 1 获取一行<br /> String line = value.toString();
  3. // 2 切割<br /> String[] words = line.split(" ");
  4. // 3 循环写出<br /> for(String word:words){<br />k.set(word);<br /> context.write(k, v);<br /> }<br /> }<br />}<br />3)Reducer保持不变<br />package com.atguigu.mapreduce.compress;<br />import java.io.IOException;<br />import org.apache.hadoop.io.IntWritable;<br />import org.apache.hadoop.io.Text;<br />import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer{

  1. IntWritable v = new IntWritable();
  2. @Override<br /> protected void reduce(Text key, Iterable<IntWritable> values,<br /> Context context) throws IOException, InterruptedException {
  3. int sum = 0;
  4. // 1 汇总<br /> for(IntWritable value:values){<br /> sum += value.get();<br /> }
  5. v.set(sum);
  6. // 2 输出<br /> context.write(key, v);<br /> }<br />}

1.6.3 Reduce输出端采用压缩

基于WordCount案例处理。
1)修改驱动
package com.atguigu.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

  1. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  2. Configuration configuration = new Configuration();
  3. Job job = Job.getInstance(configuration);
  4. job.setJarByClass(WordCountDriver.class);
  5. job.setMapperClass(WordCountMapper.class);<br /> job.setReducerClass(WordCountReducer.class);
  6. job.setMapOutputKeyClass(Text.class);<br /> job.setMapOutputValueClass(IntWritable.class);
  7. job.setOutputKeyClass(Text.class);<br /> job.setOutputValueClass(IntWritable.class);
  8. FileInputFormat.setInputPaths(job, new Path(args[0]));<br /> FileOutputFormat.setOutputPath(job, new Path(args[1]));
  9. // 设置reduce端输出压缩开启<br /> FileOutputFormat.setCompressOutput(job, true);
  10. // 设置压缩的方式<br /> FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); <br />// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); <br />// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
  11. boolean result = job.waitForCompletion(true);
  12. System.exit(result?1:0);<br /> }<br />}<br />2MapperReducer保持不变(详见4.6.2

第2章Hadoop企业优化

2.1 MapReduce 跑的慢的原因

2.2 MapReduce优化方法

MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

2.2.1 数据输入

2.2.2 Map阶段

2.2.3 Reduce阶段

2.2.4 I/O传输

2.2.5 数据倾斜问题

2.2.6 常用的调优参数

1)资源相关参数
(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)

配置参数 参数说明
mapreduce.map.memory.mb 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.reduce.memory.mb 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.map.cpu.vcores 每个MapTask可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.cpu.vcores 每个ReduceTask可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.shuffle.parallelcopies 每个Reduce去Map中取数据的并行数。默认值是5
mapreduce.reduce.shuffle.merge.percent Buffer中的数据达到多少比例开始写入磁盘。默认值0.66
mapreduce.reduce.shuffle.input.buffer.percent Buffer大小占Reduce可用内存的比例。默认值0.7
mapreduce.reduce.input.buffer.percent 指定多少比例的内存用来存放Buffer中的数据,默认值是0.0

(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)

配置参数 参数说明
yarn.scheduler.minimum-allocation-mb 给应用程序Container分配的最小内存,默认值:1024
yarn.scheduler.maximum-allocation-mb 给应用程序Container分配的最大内存,默认值:8192
yarn.scheduler.minimum-allocation-vcores 每个Container申请的最小CPU核数,默认值:1
yarn.scheduler.maximum-allocation-vcores 每个Container申请的最大CPU核数,默认值:32
yarn.nodemanager.resource.memory-mb 给Containers分配的最大物理内存,默认值:8192

(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)

配置参数 参数说明
mapreduce.task.io.sort.mb Shuffle的环形缓冲区大小,默认100m
mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认80%

2)容错相关参数(MapReduce性能优化)

配置参数 参数说明
mapreduce.map.maxattempts 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.reduce.maxattempts 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.task.timeout Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

2.3 HDFS小文件优化方法

2.3.1 HDFS小文件弊端

HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。

2.3.2 HDFS小文件解决方案

小文件的优化无非以下几种方式:

  1. 1. 在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
  2. 1. 在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
  3. 1. MapReduce处理时,可采用CombineTextInputFormat提高效率。

第3章 Hadoop新特性

3.1 2.x新特性

3.1.1 集群间数据拷贝

1)scp实现两个远程主机之间的文件复制
scp -r hello.txt root@hadoop103:/user/atguigu/hello.txt // 推 push
scp -r root@hadoop103:/user/atguigu/hello.txt hello.txt // 拉 pull
scp -r root@hadoop103:/user/atguigu/hello.txt root@hadoop104:/user/atguigu //是通过本地主机中转实现两个远程主机的文件复制;如果在两个远程主机之间ssh没有配置的情况下可以使用该方式。
2)采用distcp命令实现两个Hadoop集群之间的递归数据复制
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hadoop distcp hdfs://hadoop102:9820/user/atguigu/hello.txt hdfs://hadoop105:9820/user/atguigu/hello.txt

3.1.2 小文件存档

1)案例实操
(1)需要启动YARN进程
[atguigu@hadoop102 hadoop-2.7.2]$ start-yarn.sh
(2)归档文件
把/user/atguigu/input目录里面的所有文件归档成一个叫input.har的归档文件,并把归档后文件存储到/user/atguigu/output路径下。
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hadoop archive -archiveName input.har -p /user/atguigu/input /user/atguigu/output
(3)查看归档
[atguigu@hadoop102 hadoop-2.7.2]$ hadoop fs -ls -r /user/atguigu/output/input.har
[atguigu@hadoop102 hadoop-2.7.2]$ hadoop fs -ls -r har:///user/atguigu/output/input.har
(4)解归档文件
[atguigu@hadoop102 hadoop-2.7.2]$ hadoop fs -cp har:/// user/atguigu/output/input.har/* /user/atguigu

3.1.3 回收站

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。
1)回收站参数设置及工作机制

图 回收站
2)启用回收站
修改core-site.xml,配置垃圾回收时间为1分钟。

fs.trash.interval
1

3)查看回收站
回收站在集群中的路径:/user/atguigu/.Trash/….
4)修改访问垃圾回收站用户名称
进入垃圾回收站用户名称,默认是dr.who,修改为atguigu用户
[core-site.xml]

hadoop.http.staticuser.user
atguigu

5)通过程序删除的文件不会经过回收站,需要调用moveToTrash()才进入回收站
Trash trash = New Trash(conf);
trash.moveToTrash(path);
6)恢复回收站数据
[atguigu@hadoop102 hadoop-2.7.2]$ hadoop fs -mv
/user/atguigu/.Trash/Current/user/atguigu/input /user/atguigu/input
7)清空回收站
[atguigu@hadoop102 hadoop-2.7.2]$ hadoop fs -expunge

3.2 3.x新特性

3.2.1 多NN的HA架构

  1. HDFS NameNode高可用性的初始实现为单个活动NameNode和单个备用NameNode,将edits复制到三个JournalNode。该体系结构能够容忍系统中一个NN或一个JN的故障。<br />但是,某些部署需要更高程度的容错能力。Hadoop3.x允许用户运行多个备用NameNode。例如,通过配置三个NameNode和五个JournalNode,群集能够容忍两个节点而不是一个节点的故障。

3.2.2 纠删码

  1. HDFS中的默认3副本方案在存储空间和其他资源(例如,网络带宽)中具有200%的开销。但是,对于I / O活动相对较低暖和冷数据集,在正常操作期间很少访问其他块副本,但仍会消耗与第一个副本相同的资源量。<br /> 纠删码(Erasure Coding)能够在不到50% 的数据冗余情况下提供和3副本相同的容错能力,因此,使用纠删码作为副本机制的改进是自然而然的。

第4章 Hadoop HA高可用

4.1 HA概述

(1)所谓HA(High Availablity),即高可用(7*24小时不中断服务)。
(2)实现高可用最关键的策略是消除单点故障。HA严格来说应该分成各个组件的HA机制:HDFS的HA和YARN的HA。
(3)Hadoop2.0之前,在HDFS集群中NameNode存在单点故障(SPOF)。
(4)NameNode主要在以下两个方面影响HDFS集群

  • NameNode机器发生意外,如宕机,集群将无法使用,直到管理员重启
  • NameNode机器需要升级,包括软件、硬件升级,此时集群也将无法使用

HDFS HA功能通过配置Active/Standby两个NameNodes实现在集群中对NameNode的热备来解决上述问题。如果出现故障,如机器崩溃或机器需要升级维护,这时可通过此种方式将NameNode很快的切换到另外一台机器。

4.2 HDFS-HA工作机制

通过双NameNode消除单点故障

4.2.1 HDFS-HA工作要点

1)元数据管理方式需要改变
内存中各自保存一份元数据;
Edits日志只有Active状态的NameNode节点可以做写操作;
两个NameNode都可以读取Edits;
共享的Edits放在一个共享存储中管理(qjournal和NFS两个主流实现);
2)需要一个状态管理功能模块
实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在NameNode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时需要防止brain split现象的发生。
3)必须保证两个NameNode之间能够ssh无密码登录
4)隔离(Fence),即同一时刻仅仅有一个NameNode对外提供服务

4.2.2 HDFS-HA自动故障转移工作机制

在该模式下,即使现役NameNode已经失效,系统也不会自动从现役NameNode转移到待机NameNode,下面学习如何配置部署HA自动进行故障转移。自动故障转移为HDFS部署增加了两个新组件:ZooKeeper和ZKFailoverController(ZKFC)进程,如图3-20所示。ZooKeeper是维护少量协调数据,通知客户端这些数据的改变和监视客户端故障的高可用服务。HA的自动故障转移依赖于ZooKeeper的以下功能:
1)故障检测
集群中的每个NameNode在ZooKeeper中维护了一个持久会话,如果机器崩溃,ZooKeeper中的会话将终止,ZooKeeper通知另一个NameNode需要触发故障转移。
2)现役NameNode选择
ZooKeeper提供了一个简单的机制用于唯一的选择一个节点为active状态。如果目前现役NameNode崩溃,另一个节点可能从ZooKeeper获得特殊的排外锁以表明它应该成为现役NameNode。
ZKFC是自动故障转移中的另一个新组件,是ZooKeeper的客户端,也监视和管理NameNode的状态。每个运行NameNode的主机也运行了一个ZKFC进程,ZKFC负责:
3)健康监测
ZKFC使用一个健康检查命令定期地ping与之在相同主机的NameNode,只要该NameNode及时地回复健康状态,ZKFC认为该节点是健康的。如果该节点崩溃,冻结或进入不健康状态,健康监测器标识该节点为非健康的。
4)ZooKeeper会话管理
当本地NameNode是健康的,ZKFC保持一个在ZooKeeper中打开的会话。如果本地NameNode处于active状态,ZKFC也保持一个特殊的znode锁,该锁使用了ZooKeeper对短暂节点的支持,如果会话终止,锁节点将自动删除。
5)基于ZooKeeper的选择
如果本地NameNode是健康的,且ZKFC发现没有其它的节点当前持有znode锁,它将为自己获取该锁。如果成功,则它已经赢得了选择,并负责运行故障转移进程以使它的本地NameNode为Active。故障转移进程与前面描述的手动故障转移相似,首先如果必要保护之前的现役NameNode,然后本地NameNode转换为Active状态。

4.3 HDFS-HA集群配置

4.3.1 环境准备

(1)修改IP
(2)修改主机名及主机名和IP地址的映射
(3)关闭防火墙
(4)ssh免密登录
(5)安装JDK,配置环境变量等

4.3.2 规划集群

hadoop102 hadoop103 hadoop104
NameNode NameNode NameNode
ZKFC ZKFC ZKFC
JournalNode JournalNode JournalNode
DataNode DataNode DataNode
ZK ZK ZK
ResourceManager
NodeManager NodeManager NodeManager

4.3.3 配置Zookeeper集群

1)集群规划
在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper。
2)解压安装
(1)解压Zookeeper安装包到/opt/module/目录下
[atguigu@hadoop102 software]$ tar -zxvf zookeeper-3.4.14.tar.gz -C /opt/module/
(2)在/opt/module/zookeeper-3.4.14/这个目录下创建zkData
mkdir -p zkData
(3)重命名/opt/module/zookeeper-3.4.14/conf这个目录下的zoo_sample.cfg为zoo.cfg
mv zoo_sample.cfg zoo.cfg
3)配置zoo.cfg文件
(1)具体配置
dataDir=/opt/module/zookeeper-3.4.14/zkData
增加如下配置
#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
(2)配置参数解读
Server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
B是这个服务器的IP地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
4)集群操作
(1)在/opt/module/zookeeper-3.4.14/zkData目录下创建一个myid的文件
touch myid
添加myid文件,注意一定要在linux里面创建,在notepad++里面很可能乱码
(2)编辑myid文件
vi myid
在文件中添加与server对应的编号:如2
(3)拷贝配置好的zookeeper到其他机器上
scp -r zookeeper-3.4.14/ root@hadoop103.atguigu.com:/opt/app/
scp -r zookeeper-3.4.14/ root@hadoop104.atguigu.com:/opt/app/
并分别修改myid文件中内容为3、4
(4)分别启动zookeeper
[root@hadoop102 zookeeper-3.4.14]# bin/zkServer.sh start
[root@hadoop103 zookeeper-3.4.14]# bin/zkServer.sh start
[root@hadoop104 zookeeper-3.4.14]# bin/zkServer.sh start
(5)查看状态
[root@hadoop102 zookeeper-3.4.14]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower
[root@hadoop103 zookeeper-3.4.14]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: leader
[root@hadoop104 zookeeper-3.4.5]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower

4.3.4 配置HDFS-HA集群

1)官方地址:http://hadoop.apache.org/
2)在opt目录下创建一个ha文件夹
sudo mkdir ha
sudo chown atguigu:atguigu /opt/ha
3)将/opt/module/下的 hadoop-3.1.3拷贝到/opt/ha目录下
cp -r /opt/module/hadoop-3.1.3 /opt/ha/
4)配置hadoop-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
5)配置core-site.xml


fs.defaultFS
hdfs://mycluster


hadoop.data.dir
/opt/ha/hadoop-3.1.3/data


6)配置hdfs-site.xml


dfs.namenode.name.dir
file://${hadoop.data.dir}/name


dfs.datanode.data.dir
file://${hadoop.data.dir}/data


dfs.nameservices
mycluster


dfs.ha.namenodes.mycluster
nn1,nn2,nn3


dfs.namenode.rpc-address.mycluster.nn1
hadoop102:9820


dfs.namenode.rpc-address.mycluster.nn2
hadoop103:9820


dfs.namenode.rpc-address.mycluster.nn3
hadoop104:9820


dfs.namenode.http-address.mycluster.nn1
hadoop102:9870


dfs.namenode.http-address.mycluster.nn2
hadoop103:9870


dfs.namenode.http-address.mycluster.nn3
hadoop104:9870


dfs.namenode.shared.edits.dir
qjournal://hadoop102:8485;hadoop103:8485;hadoop104:8485/mycluster



dfs.client.failover.proxy.provider.mycluster
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider



dfs.ha.fencing.methods
sshfence



dfs.ha.fencing.ssh.private-key-files
/home/atguigu/.ssh/id_rsa



dfs.journalnode.edits.dir
${hadoop.data.dir}/jn


7)拷贝配置好的hadoop环境到其他节点

4.3.5 启动HDFS-HA集群

1)将HADOOP_HOME环境变量更改到HA目录
sudo vim /etc/profile.d/my_env.sh
将HADOOP_HOME部分改为如下
##HADOOP_HOME
export HADOOP_HOME=/opt/ha/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
2)在各个JournalNode节点上,输入以下命令启动journalnode服务
注意:先将/tmp/下的内容清空
hdfs —daemon start journalnode
3)在[nn1]上,对其进行格式化,并启动
hdfs namenode -format
hdfs —daemon start namenode
4)在[nn2]和[nn3]上,同步nn1的元数据信息
hdfs namenode -bootstrapStandby
5)启动[nn2]和[nn3]
hdfs —daemon start namenode
6)查看web页面显示
Hadoop(HA&新特性)V3.0 - 图1
图 hadoop102(standby)
Hadoop(HA&新特性)V3.0 - 图2
图 hadoop103(standby)
Hadoop(HA&新特性)V3.0 - 图3
图 hadoop104(standby)
7)在所有节点上上,启动datanode
hdfs —daemon start datanode
8)将[nn1]切换为Active
hdfs haadmin -transitionToActive nn1
9)查看是否Active
hdfs haadmin -getServiceState nn1

4.3.6 配置HDFS-HA自动故障转移

1)具体配置
(1)在hdfs-site.xml中增加

dfs.ha.automatic-failover.enabled
true

(2)在core-site.xml文件中增加

ha.zookeeper.quorum
hadoop102:2181,hadoop103:2181,hadoop104:2181

2)启动
(1)关闭所有HDFS服务:
stop-dfs.sh
(2)启动Zookeeper集群:
zkServer.sh start
(3)初始化HA在Zookeeper中状态:
hdfs zkfc -formatZK
(4)启动HDFS服务:
start-dfs.sh
3)验证
(1)将Active NameNode进程kill
kill -9 namenode的进程id
(2)将Active NameNode机器断开网络
service network stop

4.4 YARN-HA配置

4.4.1 YARN-HA工作机制

1)官方文档:
http://hadoop.apache.org/docs/r3.1.3/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html
2)YARN-HA工作机制
Hadoop(HA&新特性)V3.0 - 图4

4.4.2 配置YARN-HA集群

1)环境准备
(1)修改IP
(2)修改主机名及主机名和IP地址的映射
(3)关闭防火墙
(4)ssh免密登录
(5)安装JDK,配置环境变量等
(6)配置Zookeeper集群
2)规划集群

hadoop102 hadoop103 hadoop104
NameNode NameNode
JournalNode JournalNode JournalNode
DataNode DataNode DataNode
ZK ZK ZK
ResourceManager ResourceManager
NodeManager NodeManager NodeManager

3)具体配置
(1)yarn-site.xml

  1. <property><br /> <name>yarn.nodemanager.aux-services</name><br /> <value>mapreduce_shuffle</value><br /> </property>
  2. <!--启用resourcemanager ha--><br /> <property><br /> <name>yarn.resourcemanager.ha.enabled</name><br /> <value>true</value><br /> </property><br /> <!--声明两台resourcemanager的地址--><br /> <property><br /> <name>yarn.resourcemanager.cluster-id</name><br /> <value>cluster-yarn1</value><br /> </property>
  3. <property><br /> <name>yarn.resourcemanager.ha.rm-ids</name><br /> <value>rm1,rm2</value><br /> </property>
  4. <property><br /> <name>yarn.resourcemanager.hostname.rm1</name><br /> <value>hadoop102</value><br /> </property>
  5. <property><br /> <name>yarn.resourcemanager.hostname.rm2</name><br /> <value>hadoop103</value><br /> </property><br /> <!--指定zookeeper集群的地址--> <br /> <property><br /> <name>yarn.resourcemanager.zk-address</name><br /> <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value><br /> </property>
  6. <!--启用自动恢复--> <br /> <property><br /> <name>yarn.resourcemanager.recovery.enabled</name><br /> <value>true</value><br /> </property><br /> <!--指定resourcemanager的状态信息存储在zookeeper集群--> <br /> <property><br /> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value><br /></property>


(2)同步更新其他节点的配置信息
4)启动hdfs
(1)在各个JournalNode节点上,输入以下命令启动journalnode服务:
hdfs —daemon start journalnode
(2)在[nn1]上,对其进行格式化,并启动:
hdfs namenode -format
hdfs —daemon start namenode
(3)在[nn2]上,同步nn1的元数据信息:
hdfs namenode -bootstrapStandby
(4)启动[nn2]:
hdfs —daemon start namenode
(5)启动所有DataNode
hdfs –-daemon start datanode
(6)将[nn1]切换为Active
hdfs haadmin -transitionToActive nn1
5)启动YARN
(1)在hadoop102中执行:
start-yarn.sh
(2)在hadoop103中执行:
yarn —daemon start resourcemanager
(3)查看服务状态
yarn rmadmin -getServiceState rm1
Hadoop(HA&新特性)V3.0 - 图5

4.5 HDFS Federation架构设计

4.5.1 NameNode架构的局限性

1)Namespace(命名空间)的限制
由于NameNode在内存中存储所有的元数据(metadata),因此单个NameNode所能存储的对象(文件+块)数目受到NameNode所在JVM的heap size的限制。50G的heap能够存储20亿(200million)个对象,这20亿个对象支持4000个DataNode,12PB的存储(假设文件平均大小为40MB)。随着数据的飞速增长,存储的需求也随之增长。单个DataNode从4T增长到36T,集群的尺寸增长到8000个DataNode。存储的需求从12PB增长到大于100PB。
2)隔离问题
由于HDFS仅有一个NameNode,无法隔离各个程序,因此HDFS上的一个实验程序就很有可能影响整个HDFS上运行的程序。
3)性能的瓶颈
由于是单个NameNode的HDFS架构,因此整个HDFS文件系统的吞吐量受限于单个NameNode的吞吐量。

4.5.2 HDFS Federation架构设计

能不能有多个NameNode

NameNode NameNode NameNode
元数据 元数据 元数据
Log machine 电商数据/话单数据

图 HDFS Federation架构设计

4.5.3 HDFS Federation应用思考

不同应用可以使用不同NameNode进行数据管理图片业务、爬虫业务、日志审计业务。Hadoop生态系统中,不同的框架使用不同的NameNode进行管理NameSpace。(隔离性)