.

…大数据:巨量数据,海量数据

1byte= 8位

1024byte= 1KB

1024KB=1MB

1024MB=1GB

1024G=1TB

1024TB=1PB

1024PB=EB

1024EB=1ZB

大数据有4个特点:

为别为:Volume(大量)、Variety(多样)、Velocity(高速)、Value(价值),一般我们称之为4V。

Hadoop生态圈

hadoop基础 - 图1

hadoop版本

Apache 免费版
Cloudera 商业版
Hortonworks 雅虎的

Hadoop的特点:

1.高可靠性 数据副本
2.高扩展性 在集群间分配任务数据,可以方便扩展数以千计的节点
3.高效性 处理速度快 数据中心
4.高容错性

Hadoop的核心组成:

1)HDFS:一个高可靠、高吞吐量的分布式文件系统。

2)MapReduce:一个分布式的离线并行计算框架。

3)YARN:作业调度与集群资源管理的框架。

4)Hadoop Common:支持其他模块的工具模块(Configuration、RPC、序列化机制、日志操作)。

HDFS基本原理:

l 客户上传的文件首先被切分成等大的数据块,存储到多台机器上,负载均衡存储

l 将数据切分、容错、负载均衡等功能透明化,用户就像操作个容量巨大、具有高容错性的磁盘那样简单

HDFS基本架构:

1)NameNode(nn):存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间、副本数、文件权限),以及每个文件的块列表和块所在的DataNode等。

2)DataNode(dn):在本地文件系统存储文件块数据,以及块数据的校验和。

3)Secondary NameNode(2nn):用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。

Hadoop安装前期准备

  1. 安装虚拟机 Linux安装
  2. 配置网络地址(NAT)
  1. [root@hadoopNode1 ~]# vi /etc/sysconfig/network-scripts/ifcfg-ens33
  1. 注:确定IP地址,网段
  1. 修改hostname配置
  1. [root@hadoopNode1 ~]# vi /etc/hostname
  1. 注:主算机名使用英文字母组成, 设置好后就不能随意的修改
  1. 修改hosts映射配置
  1. [root@hadoopNode1 ~]# vi /etc/hosts
  1. 关闭防火墙
    关闭:
    开机禁用: ```shell [root@hadoopNode1 ~]#systemctl stop firewalld

[root@hadoopNode1 ~]#systemctl disable firewalld

  1. 5. 重起操作系统生效

创建用户ambow,并创建密码ambow(略)

  1. ```shell
  2. [root@hadoopNode1 ~]# useradd ambow
  3. [root@hadoopNode1 ~]# passwd ambow
  1. 设置ambow用户具有root权限 sudo

使用root用户,修改 /etc/sudoers 文件,找到下面一行,在root下面添加一行,如下所示:

  1. [ambow@ master soft]# vi /etc/sudoers
  2. ## Allow root to run any commands anywhere
  3. root ALL=(ALL) ALL
  4. ambow ALL=(ALL) ALL

修改完毕,现在可以用ambow帐号登录,然后用命令 su - ambow,即可获得root权限进行操作。

  1. 安装JDK
    安装命令tree
    sudo yum install -y treetar包: ```shell [ambow@hadoopNode1 ~]$ pwd /home/ambow [ambow@hadoopNode1 ~]$ mkdir soft [ambow@hadoopNode1 ~]$ mkdir app [ambow@hadoopNode1 ~]$ ls app soft [ambow@hadoopNode1 ~]$ tree . . ├── app └── soft ├── hadoop-2.6.0-cdh5.14.2.tar ├── jdk-8u121-linux-x64.tar.gz └── zookeeper-3.4.6.tar.gz

2 directories, 3 files [ambow@hadoopNode1 ~]$ pwd /home/ambow [ambow@hadoopNode1 ~]$ tar -zxvf ./soft/jdk-8u112-linux-x64.tar.gz -C ./app/

  1. 配置JDK:
  2. ```shell
  3. [ambow@hadoopNode1 jdk1.8.0_112]$ vi ~/.bash_profile
  4. [ambow@hadoopNode1 jdk1.8.0_112]$ cat ~/.bash_profile
  5. # .bash_profile
  6. # Get the aliases and functions
  7. if [ -f ~/.bashrc ]; then
  8. . ~/.bashrc
  9. fi
  10. # User specific environment and startup programs
  11. JAVA_HOME=/home/ambow/app/jdk1.8.0_112
  12. PATH=$PATH:$HOME/.local/bin:$HOME/bin:$JAVA_HOME/bin
  13. export PATH
  14. export JAVA_HOME
  15. [ambow@hadoopNode1 jdk1.8.0_121]$
  16. [ambow@hadoopNode1 jdk1.8.0_121]$ source ~/.bash_profile

source ~/.bash_profile 让配置文件生效

  1. 重启操作系统
    rebootHadoop 三种模式安装

1.本地模式 用于开发和调式

2.伪分布式 模拟一个小规模的集群
一台主机模拟多主机
启动 NameNode DataNode ResouceManger,nodeManager
3.集群模式:(生产环境)
多台主机,分别充当NaameNode,DataNode 。。。。
Hadoop本地模式安装:

  1. 解压Hadoop软件

    1. [ambow@hadoopNode1 sbin]$ tar -zxvf ~/soft/hadoop-2.6.0-cdh5.14.2.tar.gz -C ~/app/
  2. 配置Hadoop环境变量 ```shell [ambow@hadoopNode1 hadoop-2.7.3]$ vi ~/.bash_profile [ambow@hadoopNode1 hadoop-2.7.3]$ cat ~/.bash_profile

    .bash_profile

Get the aliases and functions

if [ -f ~/.bashrc ]; then . ~/.bashrc fi

User specific environment and startup programs

JAVA_HOME=/home/ambow/app/jdk1.8.0_112

HADOOP_HOME=/home/ambow/app/hadoop-2.6.0-cdh5.14.2

PATH=$PATH:$HOME/.local/bin:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

export PATH export JAVA_HOME export HADOOP_HOME

  1. 1. 环境变量生效
  2. ```shell
  3. [ambow@hadoopNode1 hadoop-2.7.3]$ source ~/.bash_profile
  1. 测试

    新建测试的数据文件: touch ~/mydata.txt
    测试语法格式:

    1. [ambow@hadoopNode1 mydata.out]$ hadoop jar ~/app/hadoop-2.6.0-cdh5.14.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.14.2.jar wordcount ~/mydata.txt ~/mydata.out2

hadoop基础 - 图2

伪分布模式配置:

  1. JDK安装
  2. Hadoop安装
  3. 配置Hadoop的$HADOOP_HOME/etc/hadoop/core-site.xml
    fs.defaultFS
    hadoop.tmp.dir

    1. [ambow@hadoopNode1 hadoop]$ vi $HADOOP_HOME/etc/hadoop/core-site.xml

    ```xml

    fs.defaultFS hdfs://hadoop1:8020 hadoop.tmp.dir /home/ambow/hdfs/data

  1. 3. 配置java_home 修改文件/home/ambow/app/hadoop-2.6.0-cdh5.14.2/etc/hadoop/hadoop-env.sh
  2. export JAVA_HOME =/home/ambow/app/jdk1.8.0_112/
  1. ```
  2. 4. 配置 hdfs-site.xml<br />dfs.replication 设置块的副本个数: 伪分布模式只能设置为**1** 默认为3
  3. ```shell
  4. [ambow@hadoopNode1 hadoop]$ vi $HADOOP_HOME/etc/hadoop/hdfs-site.xml
  1. <configuration>
  2. <property>
  3. <!-- 配置每个block的副本个数 默认3个 当是单节点时配置为1 不能配置态多,态多反而降低效果 -->
  4. <name>dfs.replication</name>
  5. <!-- 伪分布式只能配1个副本 -->
  6. <value>1</value>
  7. </property>
  8. </configuration>
  1. 格式化
  1. [ambow@hadoopNode1 ~]$ hadoop namenode -format

一般只格式化一次,如果要再格式化,建议要把各dataNode节点的数据删除要,防止DataNode和NameNode的集群ID号不一致而无法启动

hadoop基础 - 图3

6.启动hadoop
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode

hadoop-daemon.sh stop namenode
hadoop-daemon.sh stop datanode

7.查看进程
jps

  1. logs日志文件
    cat ~/soft/hadoop/logs

9:WEB访问查看
http://192.168.100.100:50070/

  1. YARN上运行MR 要配置两个配置文件
    配置mapred-site.xml.template

    1. [ambow@hadoopNode1 hadoop]$ vi $HADOOP_HOME/etc/hadoop/mapred-site.xml.template
    1. <configuration>
    2. <property>
    3. <!-- 指定MapReduce使用Yarn资源管理框架 -->
    4. <name>mapreduce.framework.name</name>
    5. <value>yarn</value>
    6. </property>
    7. </configuration>
  2. 配置yarn-site.xml

    yarn.resourcemanger.hostname
    yarn.nodemaager.aux-service

  1. <configuration>
  2. <property>
  3. <!-- 指定yaran主要管理一个机节点 主机名 -->
  4. <name>yarn.resourcemanager.hostname</name>
  5. <value>hadoop1</value>
  6. </property>
  7. <property>
  8. <!-- 使用mapreduce_shuffle服务 -->
  9. <name>yarn.nodemanager.aux-services</name>
  10. <value>mapreduce_shuffle</value>
  11. </property>
  12. </configuration>
  1. 启动 yarn
  1. [ambow@hadoopNode1 data]$ yarn-daemon.sh start resourcemanager
  2. [ambow@hadoopNode1 data]$ yarn-daemon.sh start nodemanager
  1. 测试MR操作

上传Linux系统中的 ~/mydata.txt文件 至 HDFS文件系统/user/ambow目录中去

  1. hadoop fs -mkdir -p /user/ambow
  2. [ambow@hadoopNode1 data]$ hadoop fs -put ~/mydata.txt /user/ambow

对hdfs的文件使用yarn来进行wordcount操作:

  1. hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.14.2.jar wordcount /user/ambow/mydata.txt /user/ambow/output/wc/

hadoop基础 - 图4

hadoop基础 - 图5

分布式集群安装:

hadoop基础 - 图6

1.修/etc/hosts

  1. 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
  2. ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
  3. 192.168.100.200 hadoop1
  4. 192.168.100.201 hadoop2
  5. 192.168.100.202 hadoop3

2.克隆虚拟机

3.分别配置各虚拟机节点的:IP地址,主机名,映射文件

  1. [root@hadoopNode2 ~]# vi /etc/hostname
  2. [root@hadoopNode2 ~]# vi /etc/sysconfig/network-scripts/ifcfg-ens33
  3. [root@hadoopNode2 ~]# vi /etc/hosts

4.验证配置

  1. [root@hadoopNode2 ~]# ping hadoop1
  2. PING hadoopNode1 (192.168.100.200) 56(84) bytes of data.
  3. 64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=1 ttl=64 time=0.190 ms
  4. 64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=2 ttl=64 time=0.230 ms
  5. 64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=3 ttl=64 time=0.263 ms
  6. 64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=4 ttl=64 time=0.227 ms
  7. ^C64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=5 ttl=64 time=0.195 ms
  8. 64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=6 ttl=64 time=0.268 ms
  9. ^C
  10. --- hadoopNode1 ping statistics ---
  11. 6 packets transmitted, 6 received, 0% packet loss, time 5000ms
  12. rtt min/avg/max/mdev = 0.190/0.228/0.268/0.035 ms
  13. [root@hadoopNode2 ~]# ping hadoop2
  14. PING hadoopNode2 (192.168.100.201) 56(84) bytes of data.
  15. 64 bytes from hadoopNode2 (192.168.100.201): icmp_seq=1 ttl=64 time=0.011 ms
  16. 64 bytes from hadoopNode2 (192.168.100.201): icmp_seq=2 ttl=64 time=0.022 ms
  17. ^C
  18. --- hadoopNode2 ping statistics ---
  19. 2 packets transmitted, 2 received, 0% packet loss, time 999ms
  20. rtt min/avg/max/mdev = 0.011/0.016/0.022/0.006 ms
  21. [root@hadoopNode2 ~]# ping hadoop3
  22. PING hadoopNode3 (192.168.100.202) 56(84) bytes of data.
  23. 64 bytes from hadoopNode3 (192.168.100.202): icmp_seq=1 ttl=64 time=0.246 ms
  24. 64 bytes from hadoopNode3 (192.168.100.202): icmp_seq=2 ttl=64 time=0.218 ms
  25. 64 bytes from hadoopNode3 (192.168.100.202): icmp_seq=3 ttl=64 time=0.218 ms
  26. ^C64 bytes from hadoopNode3 (192.168.100.202): icmp_seq=4 ttl=64 time=0.227 ms
  27. ^C
  28. --- hadoopNode3 ping statistics ---
  29. 4 packets transmitted, 4 received, 0% packet loss, time 3001ms
  30. rtt min/avg/max/mdev = 0.218/0.227/0.246/0.015 ms

Master—-slave主从架构

  1. 在master节点上进设置免密登陆

    1).生成Maste节点公钥和私钥

  1. ssh-keygen -t rsa

2)分发

  1. ssh-copy-id hadoop1
  2. ssh-copy-id hadoop2
  3. ssh-copy-id hadoop3

3)验证 在Master节点上登陆各节点,看是否需要密码

  1. ssh hadoop2
  2. ssh hadoop3

6.配置核心文件 core-siter.xml

  1. [ambow@hadoopNode1 hadoop]$ vi /home/ambow/app/hadoop-2.6.0-cdh5.14.2/etc/hadoop/core-site.xml
  1. <configuration>
  2. <!-- 配置默认FS hadoop3.X 默认端口为9820 hadoop2.X 默认端口为8020 hadoop1.X 默认端口为9000 一般伪分布设置为localhost:8020 -->
  3. <property>
  4. <name>fs.defaultFS</name>
  5. <value>hdfs://hadoop1:8020</value>
  6. </property>
  7. <!-- 指定hadoop运行时产生文件存储的目录 会自动创建 不建议默认 -->
  8. <property>
  9. <name>hadoop.tmp.dir</name>
  10. <value>/home/ambow/hdfs/data</value>
  11. </property>
  12. </configuration>
  1. hdfs-site.xml
  1. <configuration>
  2. <property>
  3. <!-- 配置每个block的副本个数 默认3个 当是单节点时配置为1 不能配置态多,态多反而降低效果 -->
  4. <name>dfs.replication</name>
  5. <!-- 伪分布式只能配1个副本 -->
  6. <value>3</value>
  7. </property>
  8. <property>
  9. <!-- 设置第辅助主节点 2NN -->
  10. <name>dfs.namenode.secondary.http-address</name>
  11. <value>hadoop2:50090</value>
  12. </property>
  13. <property>
  14. <!-- 检查点的路径 -->
  15. <name>dfs.namenode.checkpoint.dir</name>
  16. <value>/home/ambow/hdfs/namesecondary</value>
  17. </property>
  18. </configuration>

8.mapred-site.xml.template

  1. <configuration>
  2. <property>
  3. <!-- 指定MapReduce使用Yarn资源管理框架 -->
  4. <name>mapreduce.framework.name</name>
  5. <value>yarn</value>
  6. </property>
  7. </configuration>
  1. yarn-site.xml
  1. <configuration>
  2. <!-- Site specific YARN configuration properties -->
  3. <property>
  4. <!-- 指定yaran主要管理一个机节点 -->
  5. <name>yarn.resourcemanager.hostname</name>
  6. <value>hadoop1</value>
  7. </property>
  8. <property>
  9. <!-- 使用mapreduce_shuffle服务 -->
  10. <name>yarn.nodemanager.aux-services</name>
  11. <value>mapreduce_shuffle</value>
  12. </property>
  13. </configuration>

10.修改slaves文件 来指定当前集群中那些节点是DataNode节点 把节点的主机名添加到slaves文件中

  1. [ambow@hadoopNode1 hadoop]$ vi $HADOOP_HOME/etc/hadoop/slaves
  1. hadoop1
  2. hadoop2
  3. hadoop3

11.分发文件到其他节点

注:分发之前要停止所有服务

关闭hdfs

  1. 网络复制:语法: scp -r 源文件目录 用户名@主机名:目标路径
  2. -r 递归复制
  1. [ambow@hadoopNode1 hadoop]$ scp -r $HADOOP_HOME/etc/hadoop ambow@hadoop2:$HADOOP_HOME/etc/
  2. [ambow@hadoopNode1 hadoop]$ scp -r $HADOOP_HOME/etc/hadoop ambow@hadoop3:$HADOOP_HOME/etc/

12.测试

start-all.sh

stop-all.sh

  1. [ambow@hadoopNode1 hadoop]$ start-all.sh

测试

查看进程jps

IP地址:8088

IP地址:50070

HDFS SHELL操作:

递归查看hdfs根目录
hadoop fs -ls -R /

查看hdfs目录

hadoop fs -ls /

查看指定的文件内容

hadoop fs -cat /user/ambow/mydata.txt

hadoop fs -text /user/ambow/mydata.txt

创建文件夹,-p用于时是否递归创建

hadoop fs -mkdir -p /user/ambow/my

从本地文件系统中复制单个或多个源路径到目标文件系统

hadoop fs -put data.txt /user/ambow/my

从标准输入中读取输入写入目标文件系统

hadoop fs -put - /user/ambow/

复制文件到本地文件

hadoop fs -get /user/ambow/mydata.txt data1.txt

删除指定的文件,只删除非空目录和文件

hadoop fs -rm /user/ambow/data.txt

递归删除,可以删除有文件的文件夹

hadoop fs -rm -r /user/ambow/my

删除空目录

hadoop fs -rmdir /user/ambow/data

将源目录中所有的文件连接到本地目标文件中

hadoop fs -getmerge /user/ambow/my data1.txt

将文件从源路径移动到目标路径,这个命令允许有多个源路径,此时目标路径必须是一个目录

hadoop fs -mv /user/ambow/data.txt /user/ambow/my

将文件从源路径复制到目标路径,这个命令允许有多个源路径,此时目标路径必须是一个目录

hadoop fs -cp /user/ambow/data.txt /user/ambow/my

显示目录中所有文件的大小

hadoop fs -du /user/ambow

显示文件大小

hadoop fs -du -s /user/ambow/mydata.txt

创建一个0字节的空文件

hadoop fs -touchz /user/ambow/file.txt

返回指定路径的统计信息

hadoop fs -stat /user/ambow

清空回收站

hadoop fs -expunge

统计文件夹数量、文件数量、文件总大小信息

hadoop fs -count /user/ambow

查看文件尾部内容,一般用于查看日志

hadoop fs -tail /user/ambow/files.COPYING

修改文件权限

hadoop fs -chmod 700 /user/ambow/data.txt

  1. [ambow@hadoopNode1 soft]$ hadoop fs -help
  2. Usage: hadoop fs [generic options]
  3. [-appendToFile <localsrc> ... <dst>]
  4. [-cat [-ignoreCrc] <src> ...] #显示文件内容
  5. [-checksum <src> ...] #校验和
  6. [-chgrp [-R] GROUP PATH...] # 与chown一样
  7. [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] # 改变权限
  8. [-chown [-R] [OWNER][:[GROUP]] PATH...] #改变文件组
  9. [-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>] #从本地复制HDFS 与put命令相同
  10. [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] #复制到本地 与get命令相同
  11. [-count [-q] [-h] <path> ...] #统计路径下的目录、文件和字节数
  12. [-cp [-f] [-p | -p[topax]] <src> ... <dst>] hdfs之间复制
  13. [-createSnapshot <snapshotDir> [<snapshotName>]] #在目录上创建快照
  14. [-deleteSnapshot <snapshotDir> <snapshotName>]
  15. [-df [-h] [<path> ...]] #文件使用情况
  16. [-du [-s] [-h] <path> ...] #查看文件或目录空间
  17. [-expunge]
  18. [-find <path> ... <expression> ...] #查找
  19. [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] #下载
  20. [-getfacl [-R] <path>]
  21. [-getfattr [-R] {-n name | -d} [-e en] <path>]
  22. [-getmerge [-nl] <src> <localdst>]
  23. [-help [cmd ...]]
  24. [-ls [-d] [-h] [-R] [<path> ...]] #查看
  25. [-mkdir [-p] <path> ...] #创建
  26. [-moveFromLocal <localsrc> ... <dst>] #从本地移到到HDFS
  27. [-moveToLocal <src> <localdst>] HDFS移到本地
  28. [-mv <src> ... <dst>] #HDFS之间移动
  29. [-put [-f] [-p] [-l] <localsrc> ... <dst>] #上传
  30. [-renameSnapshot <snapshotDir> <oldName> <newName>]
  31. [-rm [-f] [-r|-R] [-skipTrash] <src> ...] #删除
  32. [-rmdir [--ignore-fail-on-non-empty] <dir> ...] 3删除空目录
  33. [-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
  34. [-setfattr {-n name [-v value] | -x name} <path>]
  35. [-setrep [-R] [-w] <rep> <path> ...]
  36. [-stat [format] <path> ...]
  37. [-tail [-f] <file>]
  38. [-test -[defsz] <path>]
  39. [-text [-ignoreCrc] <src> ...] #查看文件内容
  40. [-touchz <path> ...] #创建空文件
  41. [-truncate [-w] <length> <path> ...]
  42. [-usage [cmd ...]]

问题:启动DataNOde节点 有些节点无法正常启动

1.删除各节点hadoop.tmp.dir所指定的路径

2.重新格式化nameNoden

安全模式:检查的最小副本数 默认为1

HDFS API操作:

1.准备环境

  1. Eclipse+Maven
  2. Eclipse+Hadooprjar

hadoop的jar包:

1.解压hadoop-2.7.3.tar.gz到非中文目录

2.删除souce 和 test的jar包

创建目录 mkdirs

创建文件 create

查看目录或文件 liststats

上传文件:copyFromLocalFile

下载文件 :copyToLocalFile

复制文件 HDSF—>HDFS : ???

删除文件:delete

追加数据:append

读取一个文件 open()

改名:rename

查看块信息: fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());

  1. /* 创建目录
  2. */
  3. public void mkdir() throws IOException, InterruptedException, URISyntaxException {
  4. // 得到一个配置文件
  5. Configuration conifg = new Configuration();
  6. // 得到一个文件系统
  7. FileSystem fs = FileSystem.get(new URI("hdfs://hadoopNode1:8020"), conifg, "ambow");
  8. boolean isMkdir = fs.mkdirs(new Path("/hadfsapi/ambow2")); // 级联创建一个/user/ambow目录
  9. System.out.println(isMkdir);
  10. }
  11. /* 查看块信息
  12. */
  13. @Test
  14. public void blockeInfo() throws IllegalArgumentException, IOException {
  15. FileStatus fileStatus = fs.getFileStatus(new Path("/user/ambow/a.txt"));
  16. BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
  17. for (BlockLocation b : fileBlockLocations) {
  18. System.out.println(b.getHosts());
  19. for (String host : b.getHosts())
  20. System.out.println(host);
  21. }
  22. }

Maven安装 :

1.下载maven包

2.解压

3.配置Maven的环境变量

添加一个MAVEN_HOME环境变量值为:Maven的解压路径

  1. MAVEN_HOME= D:\apache-maven-3.5.2

在PATH环境变量中添加:%MAVEN_HOME%\bin

4、.配置本地Maven 库

%MAVEN_HOME%\repository 为本地Maven库 4.5G大小,要拷贝

配置%MAVEN_HOME%\conf\settings.xml中指定本地Maven库的路径

  1. <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
  4. <!-- 配置指向自己本地Maven库路径 -->
  5. <localRepository>D:\apache-maven-3.5.2\repository</localRepository>
  6. <profiles>
  7. <profile>
  8. <id>jdk-1.8</id>
  9. <!-- 另外一种激活方式 -->
  10. <activation>
  11. <activeByDefault>true</activeByDefault>
  12. <jdk>1.8</jdk>
  13. </activation>
  14. <properties>
  15. <maven.compiler.source>1.8</maven.compiler.source>
  16. <maven.compiler.target>1.8</maven.compiler.target>
  17. <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
  18. </properties>
  19. </profile>
  20. <!--与activeProfies对应-->
  21. <profile>
  22. <id>nexusAliyun</id>
  23. <repositories>
  24. <repository>
  25. <id>nexusAliyun</id>
  26. <name>Repository of central</name>
  27. <url>http://maven.aliyun.com/nexus/content/groups/public</url>
  28. <layout>default</layout>
  29. <snapshots>
  30. <enabled>true</enabled>
  31. </snapshots>
  32. <releases>
  33. <enabled>true</enabled>
  34. </releases>
  35. </repository>
  36. </repositories>
  37. </profile>
  38. </profiles>
  39. <activeProfiles>
  40. <activeProfile>nexusAliyun</activeProfile>
  41. </activeProfiles>
  42. </settings>

Eclispe集成Maven插件

1.windows—>preferences—->Maven—->installations—->add

hadoop基础 - 图7

  1. windows—>preferences—->Maven—->User Settings—->指定用户的Maven配置文件

D:\apache-maven-3.5.2\conf\settings.xml

3.创建Maven项目

问题:

hdfs-siter.xml
HDFS 架构: Mster/Savle

1)Client:就是客户端。

(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行存储。

(2)与NameNode交互,获取文件的位置信息。

(3)与DataNode交互,读取或者写入数据。

(4)Client提供一些命令来管理HDFS,比如启动或者关闭HDFS。

(5)Client可以通过一些命令来访问HDFS。

2)NameNode:就是master,它是一个主管、管理者。

(1)管理HDFS的名称空间。

(2)管理数据块(Block)映射信息

(3)配置副本策略

(4)处理客户端读写请求。

3)DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。

(1)存储实际的数据块。

(2)执行数据块的读/写操作。

4)Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。

(1)辅助NameNode,分担其工作量。

(2)定期合并Fsimage和Edits-生成-à新的Fsimage,并推送给NameNode。

(3)在紧急情况下,可辅助恢复NameNode。

注:fsimage保存了最新的元数据检查点,在HDFS启动时加载fsimage的信息,包含了整个HDFS文件系统的所有目录和文件的信息。
对于文件来说包括了数据块描述信息、修改时间、访问时间等;对于目录来说包括修改时间、访问权限控制信息(目录所属用户,所在组)等。
editlog主要是在NameNode已经启动情况下对HDFS进行的各种更新操作进行记录,HDFS客户端执行所有的写操作都会被记录到editlog中。

HDFS读流程:

hadoop基础 - 图8

读流程如下:
  我们先说一个语义:下载回这个文件。换句话说就是取回这个文件的所有的块,那么当有能力取回文件的所有块的时候,那么它的子集操作就是取回其中某些块或者某个块也能实现。所以我们先来看取回文件的所有块的流程是怎么实现的:
 1、客户端和NameNode建立连接,获取文件block的位置信息(fileBlockLocations)
 2、客户端根据自己想要获取的数据位置挑选需要连接的DataNode(如果全文下载,从0开始;如果是从某一位置开始,客户端需要给出)
需要用inputstream.seek(long)//从什么位置开始读取,和哪个DataNode开始连接获取block;
 3、距离的概念:只有文件系统在读流程中附加距离优先的概念,计算层才能够被动实现计算向数据移动,距离有以下三种:
  (1)本地,最近的距离;
  (2)同机架,次之的距离;
  (3)other(数据中心),最远的距离;
 4、客户端下载完成block后会验证DataNode中的MD5,保证块数据的完整性。

HDFS写流程:

hadoop基础 - 图9

写流程如下:
  1、客户端访问NameNode,NameNode检查路径和权限,如果路径中有与要上传的文件重名的文件就不能上传了,不能覆盖,如果没有才创建,创建名为file.copying的临时文件;
  2、NameNode触发副本放置策略,如果客户端在集群内的某一台机器,那么副本第一块放置在该服务器上,然后再另外挑两台服务器;如果在集群外,namenode会根据策略先找一个机架选出一个datanode,然后再从另外的机架选出另外两个datanode,然后namenode会将选出的三个datanode按距离组建一个顺序,然后将顺序返回给客户端;
  3、客户端会根据返回的三个节点和第一个节点建立一个socket连接(只会和第一个节点建立),第一个节点又会和第二个节点建立socket连接,由第二个节点又会和第三个节点建立一个socket连接,这种连接的方式叫Pipeline;
  4、客户端会将block切分成package(默认是64kB),以流式在pipeline中传输
好处:
   (1)速度快:时间线重叠(其实流式也是一种变异的并行);
   (2)客户端简单:副本的概念是透明的;
  5、由DataNode完成接收block块后,block的metadata(MD5校验用)通过一个心跳将信息汇报给NameNode;
  6、如果再pipeline传输中,任意节点失败,上游节点直接连接失败节点的下游节点继续传输,最终在第5步汇报后,NameNode会发现副本数不足,一定会出发DataNode复制更多副本,客户端Client副本透明;
  7、client一直重复以上操作,逐一将block块上传,同时DataNode汇报block的位置信息,时间线重叠;
  8、最终,如果NameNode收到了DataNode汇报的所有块的信息,将文件的.copying去掉,文件可用。

HDFS删除流程:

hadoop基础 - 图10

1.client 执行 hadoop fs -rm /abc.txt
  2.会执行DistributedFileSystem类的delete方法,这个方法会接收用户传来的文件路径 /abc.txt=》找namenode,执行delete方法(源码注释:Delete the given file or directory from the file system.)=》这里注意,namenode的删除操作,只是把这个路径信息从内存里元数据信息里删除,但是真正的数据的block块并没有立即删除。但是这并不影响,因为如果元数据里没有此文件信息,拿也拿不到。
  3.如果想真正删干净对应的文件数据,得通过DataNode来删除。DataNode会定期的给NameNode发送心跳包,如果是删除,NameNode在接收到DataNode心跳包后,会把删除的指令包括删除哪些文件数据传达给DataNode,DataNode接到指令后,执行删除指令,把block块从本机上删除。

复本放置策略:

hadoop基础 - 图11

MapReduce:

编写MapRdeuce计算框架:

  1. 在Eclipse下新建一个项目工程
  2. 导入包
  3. 创建自己的WcMapper类,继承org.apache.hadoop.mapreduce.Mapper,并重写map()方法 ``` public class MyMapper extends Mapper

{ protected void map(KEYIN key1, VALUEIN value1, Context context) {

  1. }

}

  1. ```
  2. KEYIN: 输入的key
  3. VALUEIN:输入的value
  4. KEYOUT:输出的key
  5. VALUEOUT:输出的value
  6. Context:Mapper的上下文
  7. ```java
  8. // 创建Mapper类 Mapper<输入的Long,输入字符串, 输出字符串, 输出的Long>
  9. public static class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  10. /*
  11. * KeyIn:LongWritable 行的偏移量 ValueIn:Text 这一行的值 TextInputformat
  12. *
  13. */
  14. @Override
  15. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  16. // 得到每一行的值,反序化为字符串
  17. String lines = value.toString();
  18. // 对每一行的字符串按空格来拆分
  19. String[] words = lines.split(" ");
  20. for (String word : words) {
  21. // 对每个单词写入Hadoop中 写入的数据必须是Hadoop的序列化
  22. context.write(new Text(word), new LongWritable(1));
  23. // hello:1 word:1 aaaa:1 空格 :1 空格 :1 空格 :1
  24. }
  25. }
  26. }
  1. 创建一个WcReduce类,继承org.apache.hadoop.mapreduce.Reducer,并重写reduce()方法

    1. public class MyReducer extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    2. {
    3. protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context)
    4. {
    5. }
    6. }
  2. KEYIN: 输入的key
    VALUEIN:输入的value
    KEYOUT:输出的key
    VALUEOUT:输出的value
    Context:Reducer的上下文

    1. public static class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
    2. // reduce(单词key, 指定的单词mapper统计的List, Context context)
    3. @Override
    4. protected void reduce(Text key, Iterable<LongWritable> values, Context context)
    5. throws IOException, InterruptedException {
    6. int total = 0;
    7. for (LongWritable once : values) {
    8. total += once.get();
    9. }
    10. context.write(key, new LongWritable(total));
    11. }
    12. }

  3. 创建一个驱动类WcDrive来配置job任务,并提交job任务

    1. public class WcDrive {
    2. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    3. //
    4. Configuration conf = new Configuration();
    5. // conf.set("HADOOP_USER_NAME","ambow");
    6. // Job对像
    7. Job job = Job.getInstance(conf);
    8. // 注册Jar驱动类
    9. job.setJarByClass(WcDrive.class);
    10. // 注册Mapper驱动类
    11. job.setMapperClass(WcMapper.class);
    12. //注册Reducer驱动类
    13. job.setReducerClass(WcReduce.class);
    14. // 设置MapOutPut输出的的类型
    15. job.setMapOutputKeyClass(Text.class);
    16. job.setMapOutputValueClass(LongWritable.class);
    17. // 设置最终输出的类型
    18. job.setOutputKeyClass(Text.class);
    19. job.setOutputValueClass(LongWritable.class);
    20. // 设置输入输出路径
    21. // org.apache.hadoop.mapred.FileInputFormat 老版本
    22. // org.apache.hadoop.mapreduce.lib.input.FileInputFormat 新版本
    23. FileInputFormat.setInputPaths(job, new Path("/user/ambow/mydata.txt"));
    24. FileOutputFormat.setOutputPath(job, new Path("/hadfsapi/out/wc8"));
    25. // FileInputFormat.setInputPaths(job, new Path(args[0]));
    26. // FileOutputFormat.setOutputPath(job, new Path(args[1]));
    27. // 设置reduce任务数为0 分区多少个???
    28. // job.setNumReduceTasks(0);
    29. // 提交作业
    30. boolean result = job.waitForCompletion(true);
    31. System.exit(result ? 0 : 1);
    32. }
  4. } ```

  1. 6.本地模式测试:
  2. mapred-site.xml yran模式设置为local模式
  3. 7.集群测试:
  4. 把项目打包成jar包,上传至master节点
  5. ```shell<br />[ambow[@hadoopNode1 ](/hadoopNode1 ) data]$ hadoop jar wc.jar com.ldg.teacher.WcDrive /user/ambow/mydata.txt /usr/ambow/out/wc9

问题:

1.写入权限问题:

2.windows环境变量问题

3.log4j问题

MapReduce计算框架的缺点:

1.不适合实时计算

2.不适合流式计算

3.不适合DAG(有向无环图)计算

MR计算思想:

  1. 把一个文件切成很多片,第一个片交由一个mapper来计算,计算的结果,交由一个或多个Reducer来计算,最后输到到文件系统

MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中

Mapper阶段:是由一个或多个mapper任务并行执行

Reducer阶段:是由一个或多个Reduce任务并行执行

Hadoop序列化:

  1. 因为hadoop集群,很多的通信都是在网上传输:为了解决提高通信效率,所以重写java的重量级序列化,方便hadoop节点之间的通信
1.常用的数据类型对应的hadoop数据序列化:
Java类型 HadoopWritable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
string Text
map MapWritable
array ArrayWritable
2自定义Hdoop序列化:

1.实现WritableComparable接口

2.实现write()方法:将对像转换为字节流并写入到输出流out中

  1. 3.实现**ReadFields()**方法:用于从输入流in中读取字节流并反序列化对象
属性类型 序列化write操作 反序列化readFields操作
属性是hadoop类型 name.write(out) name.readFields(in)
属性是java类型 out.write(name) in.readFields(name)

4.实现comparaTo()方法

5.属性可以使用 Hadoop的类型,也可 使用java的类型

6.实现各属性的set方法

7.实现构造方法

8.实现toString(),hasCode(),equals()方法

Split分片:

注:1.分片个数,决定 Mapper任务个数

  1. 2.分片是**逻辑分片**,只记住起始位置、长度以及所在的节点列表等
  1. 分片只对每一个文件单独切片 ,有多个小于block块大小的文件时,就产生多少个分片 ——?可以把小文件合并成一个大文件来处理

1566358361435

分片计算公式:

$$ Math.max(minSize,Math.min(maxSize,blocksize))) $$

1566358510423

分片源码:

  1. public List<InputSplit> getSplits(JobContext job) throws IOException {
  2. StopWatch sw = new StopWatch().start();
  3. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  4. long maxSize = getMaxSplitSize(job);
  5. // generate splits
  6. List<InputSplit> splits = new ArrayList<InputSplit>();
  7. List<FileStatus> files = listStatus(job);
  8. for (FileStatus file: files) {
  9. Path path = file.getPath();
  10. long length = file.getLen();
  11. if (length != 0) {
  12. BlockLocation[] blkLocations;
  13. if (file instanceof LocatedFileStatus) {
  14. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  15. } else {
  16. FileSystem fs = path.getFileSystem(job.getConfiguration());
  17. blkLocations = fs.getFileBlockLocations(file, 0, length);
  18. }
  19. if (isSplitable(job, path)) {
  20. long blockSize = file.getBlockSize();
  21. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  22. long bytesRemaining = length;
  23. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  24. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  25. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  26. blkLocations[blkIndex].getHosts(),
  27. blkLocations[blkIndex].getCachedHosts()));
  28. bytesRemaining -= splitSize;
  29. }
  30. if (bytesRemaining != 0) {
  31. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  32. splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  33. blkLocations[blkIndex].getHosts(),
  34. blkLocations[blkIndex].getCachedHosts()));
  35. }
  36. } else { // not splitable
  37. splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
  38. blkLocations[0].getCachedHosts()));
  39. }
  40. } else {
  41. //Create empty hosts array for zero length files
  42. splits.add(makeSplit(path, 0, length, new String[0]));
  43. }
  44. }
  45. // Save the number of input files for metrics/loadgen
  46. job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  47. sw.stop();
  48. if (LOG.isDebugEnabled()) {
  49. LOG.debug("Total # of splits generated by getSplits: " + splits.size()
  50. + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
  51. }
  52. return splits;
  53. }
  54. protected long computeSplitSize(long blockSize, long minSize,
  55. long maxSize) {
  56. return Math.max(minSize, Math.min(maxSize, blockSize));
  57. }

分区:

把mapper计算出来结果,对key按一定的规则进行分区

如 :wordCount类: 按字母分一类( a—z A—-Z),非字母分一个类 要分2个区

  1. 上海,北京,重庆

1.编写自定义分区类,继承org.apache.hadoop.mapreduce.Partitioner类,重写getPartition()方法

  1. 通过返回一个int值来进行划分分区
  1. //自定义分区类 map()函数的结果拿来分区 ( K,V)与map输出的(K,V)一致
  2. public static class MyPartitioner extends Partitioner<Text, IntWritable> {
  3. //转发给4个不同的reducer
  4. @Override
  5. public int getPartition(Text key, IntWritable value, int numPartitons) {
  6. if (key.toString().equals("xiaomi"))
  7. return 0;
  8. if (key.toString().equals("huawei"))
  9. return 1;
  10. if (key.toString().equals("iphone7"))
  11. return 2;
  12. return 3;
  13. }
  14. }

2.job中设置分区

  1. //设置Partitioner
  2. job.setPartitionerClass(MyPartitioner.class);
  3. //设置4个reducer,每个分区一个 设置为0表是没有分区也就是没有Reduce
  4. job.setNumReduceTasks(4);
  1. /**
  2. * 自定义Partitoner在MapReduce中的应用
  3. * 品牌 销售量 日期
  4. *xiaomi 22 2019.8.22
  5. *huawei 34 2019.8.22
  6. *iphone7 1 2019.8.22
  7. *huawei 100 2019.8.20
  8. *huawei 299 2019.8.21
  9. *
  10. *分别统计各个品牌的销售量
  11. *
  12. */
  13. public class PartitionerApp {
  14. private static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  15. @Override
  16. protected void map(LongWritable key, Text value, Context context)
  17. throws IOException, InterruptedException {
  18. String[] s = value.toString().split("\t");
  19. context.write(new Text(s[0]), new IntWritable(Integer.parseInt(s[1])));
  20. //map输出结果:
  21. /*xiaomi 22
  22. *huawei 34
  23. *iphone7 1
  24. *huawei 100
  25. *huawei 299
  26. *
  27. *10000000条
  28. *
  29. */
  30. }
  31. }
  32. //分区结果
  33. /*xiaomi 22
  34. *huawei 100
  35. *huawei 299
  36. *
  37. *10000000条
  38. *
  39. */
  40. /*xiaomi 22
  41. *xiaomi 221
  42. *
  43. */
  44. /*iphone7 1
  45. *iphone7 3
  46. *iphone7 3
  47. */
  48. /*
  49. * jingli 43
  50. * voio 33
  51. *
  52. */
  53. //开启4个MyReduce类,分别来对每个区进行处理
  54. private static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  55. @Override
  56. protected void reduce(Text key, Iterable<IntWritable> value, Context context)
  57. throws IOException, InterruptedException {
  58. int sum = 0;
  59. for (IntWritable val : value) {
  60. sum += val.get();
  61. }
  62. context.write(key, new IntWritable(sum));
  63. }
  64. }
  65. //自定义分区类 map()函数的结果拿来分区 ( K,V)与map输出的(K,V)一致
  66. public static class MyPartitioner extends Partitioner<Text, IntWritable> {
  67. //转发给4个不同的reducer
  68. @Override
  69. public int getPartition(Text key, IntWritable value, int numPartitons) {
  70. if (key.toString().equals("xiaomi"))
  71. return 0;
  72. if (key.toString().equals("huawei"))
  73. return 1;
  74. if (key.toString().equals("iphone7"))
  75. return 2;
  76. return 3;
  77. }
  78. }
  79. // driver
  80. public static void main(String[] args) throws Exception {
  81. String INPUT_PATH = "hdfs://hadoopNode1:8020/user/ambow/sal_data.txt";
  82. String OUTPUT_PATH = "hdfs://hadoopNode1:8020/user/ambow/out/partitioner";
  83. Configuration conf = new Configuration();
  84. final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
  85. if (fileSystem.exists(new Path(OUTPUT_PATH))) {
  86. fileSystem.delete(new Path(OUTPUT_PATH), true);
  87. }
  88. Job job = Job.getInstance(conf, "PartitionerApp");
  89. // run jar class
  90. job.setJarByClass(PartitionerApp.class);
  91. // 设置map
  92. job.setMapperClass(MyMapper.class);
  93. job.setMapOutputKeyClass(Text.class);
  94. job.setMapOutputValueClass(IntWritable.class);
  95. // 设置reduce
  96. job.setReducerClass(MyReducer.class);
  97. job.setOutputKeyClass(Text.class);
  98. job.setOutputValueClass(IntWritable.class);
  99. //设置Partitioner
  100. job.setPartitionerClass(MyPartitioner.class);
  101. //设置4个reducer,每个分区一个
  102. job.setNumReduceTasks(4);
  103. // input formart
  104. job.setInputFormatClass(TextInputFormat.class); //默认类型
  105. Path inputPath = new Path(INPUT_PATH);
  106. FileInputFormat.addInputPath(job, inputPath);
  107. // output format
  108. job.setOutputFormatClass(TextOutputFormat.class); // 默认
  109. Path outputPath = new Path(OUTPUT_PATH);
  110. FileOutputFormat.setOutputPath(job, outputPath);
  111. // 提交job
  112. System.exit(job.waitForCompletion(true) ? 0 : 1);
  113. }
  114. }

Combiner————-

Map端的Reduce(局部的规约处理 替Reduce与先做一些事) 数据的合并
注:combiner有些业务是不适合处理的,如求平均值
分两个阶段:一个是map溢写阶段,一个是Reduce合并阶段

  1. 自定义Combiner类,继承Reduce类,重写的reduce ()方法

    1. public static class MyCombiner
    2. extends Reducer<Text,IntWritable,Text,IntWritable> {
    3. private IntWritable result = new IntWritable();
    4. public void reduce(Text key, Iterable<IntWritable> values,
    5. Context context
    6. ) throws IOException, InterruptedException {
    7. int sum = 0;
    8. for (IntWritable val : values) {
    9. sum += val.get();
    10. }
    11. result.set(sum);
    12. context.write(key, result);
    13. }
    14. }
  2. 要求Combiner的reduce方法输入输出类型必须一致
    Reducer

  3. 在job设置
    1. job.setCombinerClass(MyCombiner.class);

注:当reduce()中的输入输出K,V数据类型相同时,可以把Reduce类当成是Combiner类

例:

  1. /**
  2. * WordCount中使用Combiner
  3. *
  4. * 单词统计
  5. *
  6. */
  7. public class WordCountCombinerApp {
  8. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  9. private final static IntWritable one = new IntWritable(1);
  10. private Text word = new Text();
  11. public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
  12. StringTokenizer itr = new StringTokenizer(value.toString());
  13. while (itr.hasMoreTokens()) {
  14. word.set(itr.nextToken());
  15. context.write(word, one);
  16. }
  17. }
  18. }
  19. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  20. private IntWritable result = new IntWritable();
  21. public void reduce(Text key, Iterable<IntWritable> values, Context context)
  22. throws IOException, InterruptedException {
  23. int sum = 0;
  24. for (IntWritable val : values) {
  25. sum += val.get();
  26. }
  27. result.set(sum);
  28. context.write(key, result);
  29. }
  30. }
  31. public static class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
  32. private IntWritable result = new IntWritable();
  33. public void reduce(Text key, Iterable<IntWritable> values, Context context)
  34. throws IOException, InterruptedException {
  35. int sum = 0;
  36. for (IntWritable val : values) {
  37. sum += val.get();
  38. }
  39. result.set(sum);
  40. context.write(key, result);
  41. }
  42. }
  43. public static void main(String[] args) throws Exception {
  44. Configuration conf = new Configuration();
  45. Job job = Job.getInstance(conf, "word count");
  46. job.setJarByClass(WordCountCombinerApp.class);
  47. job.setMapperClass(TokenizerMapper.class);
  48. // 通过job设置Combiner处理类,其实逻辑就可以直接使用Reducer
  49. job.setCombinerClass(MyCombiner.class);
  50. job.setReducerClass(IntSumReducer.class);
  51. job.setOutputKeyClass(Text.class);
  52. job.setOutputValueClass(IntWritable.class);
  53. FileInputFormat.addInputPath(job, new Path("/user/ambow/mydata.txt"));
  54. FileOutputFormat.setOutputPath(job, new Path("/user/ambow/out/wc10"));
  55. System.exit(job.waitForCompletion(true) ? 0 : 1);
  56. }
  57. }

Mapper与Reducer的键值关系:

Mapper:

(K1,V1)——>List( K2,V2)

Reducer:

(K2,List(V2))——>List(K3,V3)

hadoop基础 - 图14

MapReduce的详细执行过程:

hadoop基础 - 图15

基本流程:
1,大数据经split划分成大小相等的数据块(数据块的大小一般等于HDFS一个块的大小)以及用户作业程序。
2,系统中有一个负责调度的Master节点和许多的Map工作节点,Reduce工作节点
3,用户作业程序提交给Master节点,Master节点寻找合适的Map节点,并将数据传给Map节点,并且Master也寻找合适的Reduce节点并将数据传给Reduce节点
4,Master节点启动Map节点执行程序,Map节点尽可能的读取本地或本机架上的数据块进行计算。(数据本地化是Mapreduce的核心特征)
5,每个Map节点处理读取的数据块,并做一些数据整理,并且将中间结果放在本地而非HDFS中,同时通知Master节点Map工作完成,并告知中间结果的存储位置。
6,Master节点等所有Map工作完成后,开始启动Reduce节点,Reduce节点通过Master节点掌握的中间结果的存储位置来远程读取中间结果。
7,Reduce节点将中间结果处理后将结果输出到一个文件中。
从用户作业程序角度来看:
一个作业执行过程中有一个Jobtracker和多个Tasktracker,分别对应于HDFS中的namenode和datanode。Jobclient在用户端把已配置参数打包成jar文件存储在HDFS,并把存储路径提交给Jobtracker,然后Jobtracker创建每一个Task,并且分发到Tasktracker服务中去执行。

hadoop基础 - 图16

快速排序:

hadoop基础 - 图17

作业:编写MR程序

1.对温度进行排序输出

2.找出最低温度那一天

mapper——-> 只输出一个key,value( 34,66,78,) reduce: max( values )

MRUnit测试框架:

  1. 引入MRUnit测试框架: pom.xml:
  1. <!-- https://mvnrepository.com/artifact/org.apache.mrunit/mrunit MRUnit测试 -->
  2. <dependency>
  3. <groupId>org.apache.mrunit</groupId>
  4. <artifactId>mrunit</artifactId>
  5. <version>0.9.0-incubating</version>
  6. <classifier>hadoop2</classifier>
  7. <scope>test</scope>
  8. </dependency>

2.编写测试类

  1. public class TestReducer {
  2. // 用于测试Reduce
  3. @Test
  4. public void TestMethod() {
  5. ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver = new ReduceDriver<Text, IntWritable, Text, IntWritable>();
  6. //设置要测试对像
  7. reduceDriver.withReducer(new MaxReduce());
  8. // 设置测试的key=1950
  9. reduceDriver.withInputKey(new Text("1950"));
  10. // 设置测试的values=[ 10,13,0]
  11. reduceDriver.withInputValues(Arrays.asList(new IntWritable(10), new IntWritable(13), new IntWritable(0)));
  12. //设置预期输出结果
  13. reduceDriver.withOutput(new Text("1950"), new IntWritable(134));
  14. //执行测试
  15. reduceDriver.runTest();
  16. }
  17. }
  18. class MaxReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
  19. @Override
  20. protected void reduce(Text key, Iterable<IntWritable> values,
  21. Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  22. int maxValue = Integer.MIN_VALUE;
  23. for (IntWritable value : values) {
  24. maxValue = Math.max(maxValue, value.get());
  25. }
  26. context.write(key, new IntWritable(maxValue));
  27. }
  28. }

MR排序:

1.默认MapReduce是根据键来进行排序 使用WritableComparable类的comparator()方法

2.job.setSortComparatorClass(cls); 设置排序类

部分排序:每个Reduce输出的文件各自排好序的,但不是全局有序

全局排序:

1.只生成一个分区,也就是只一个Reduce,但对大文件没使用并行架构

2.按数学区间段来进行分区,每个分区排序后组成一个全局排好序的文件

分区 1 2 3 4
温度 温度<-10 -10<温度<0 0<温度<10 10>温度
记录占比 11% 13% 17% 59%

全局排序问题:

数据倾斜问题 记录没有均匀划分

使用Hadoop 提供的采样器分区解决:

  1. // 设置分区类使用Hadoop自带的TotalOrderPartitioner类
  2. job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.class);
  3. // 随机采集器类:采样率:0.1 最大采样数 :10000 最大分区数:10
  4. RandomSampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
  5. // 提供的取样器为给定的job作业编写分区文件
  6. InputSampler.writePartitionFile(job, sampler);
  7. job.setNumReduceTasks(10);

二次排序:

以claas相同时再以socre排序

  1. * id class socre
  2. * 1 1 20
  3. * 2 1 30
  4. * 3 1 50
  5. * 4 2 44
  6. * 5 2 55
  1. /**
  2. * 在分组比较的时候,只比较原来的key,而不是组合key。
  3. */
  4. public static class GroupingComparator implements RawComparator<IntPair> {
  5. @Override
  6. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  7. return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8);
  8. }
  9. @Override
  10. public int compare(IntPair o1, IntPair o2) {
  11. int first1 = o1.getFirst();
  12. int first2 = o2.getFirst();
  13. return first1 - first2;
  14. }
  15. }

MR去重:

以一行数据相同为重复

一个行某几列数据同为重复

  1. 用户id 商品id 收藏日期
  1. 10181 1000481 2010-04-04 16:54:31
  2. 20001 1001597 2010-04-07 15:07:52
  3. 20001 1001560 2010-04-07 15:08:27
  4. 20042 1001368 2010-04-08 08:20:30
  5. 20067 1002061 2010-04-08 16:45:33
  6. 20056 1003289 2010-04-12 10:50:55
  7. 20056 1003290 2010-04-12 11:57:35
  8. 20056 1003292 2010-04-12 12:05:29
  9. 20054 1002420 2010-04-14 15:24:12
  10. 20055 1001679 2010-04-14 19:46:04
  11. 20054 1010675 2010-04-14 15:23:53
  12. 20054 1002429 2010-04-14 17:52:45
  13. 20076 1002427 2010-04-14 19:35:39
  14. 20054 1003326 2010-04-20 12:54:44
  15. 20056 1002420 2010-04-15 11:24:49
  16. 20064 1002422 2010-04-15 11:35:54
  17. 20056 1003066 2010-04-15 11:43:01
  18. 20056 1003055 2010-04-15 11:43:06
  19. 20056 1010183 2010-04-15 11:45:24
  20. 20056 1002422 2010-04-15 11:45:49
  21. 20056 1003100 2010-04-15 11:45:54
  22. 20056 1003094 2010-04-15 11:45:57
  23. 20056 1003064 2010-04-15 11:46:04
  24. 20056 1010178 2010-04-15 16:15:20
  25. 20076 1003101 2010-04-15 16:37:27
  26. 20076 1003103 2010-04-15 16:37:05
  27. 20076 1003100 2010-04-15 16:37:18
  28. 20076 1003066 2010-04-15 16:37:31
  29. 20054 1003103 2010-04-15 16:40:14
  30. 20054 1003100 2010-04-15 16:40:16

实现原理:把表示重复字段使用Mapper阶段的Key表示,得上Reduce的归约,把相同的key作为一个输出

MR—JOIN:

两个文件进行关联

  1. [ambow@hadoopNode1 data]$ cat table1.txt
  2. 1 zhangsan
  3. 2 lisi
  4. 3 wangwu
  5. 4 Xia
  6. [ambow@hadoopNode1 data]$ cat table2.txt
  7. math 1 33
  8. java 3 45
  9. test 2 55
  10. C 2 55
  11. python 2 55
  12. word 3 55
  13. oracle 1 22
  14. [ambow@hadoopNode1 data]$ hadoop fs -cat /user/ambow/out/mapjoin/part-r-00000
  15. 1 zhangsan oracle 22
  16. 1 zhangsan math 33
  17. 2 lisi python 55
  18. 2 lisi C 55
  19. 2 lisi test 55
  20. 3 wangwu word 55
  21. 3 wangwu java 45
  22. [ambow@hadoopNode1 data]$

Reduce端的JOIN

实现原理:map端打上标记(是那个表的数据),以关联的字段作为key,Rduce端进行关联处理

Mapper端的JOIN

当两个表中,基中一个表的数据比较少的情况下可以Mapper端的JOIN

实现原理:把小的一个表,直接到内存中(存入到集合),map端只读取大表,每读取一行直接就行关联

YARN资源管理器:

RM:集群全局资源管理器

NM:节点资源管理器

AM:对应一个JOB的ApplicationMaster

1.client向RM提交应用程序,其中包括启动该应用的ApplicationMaster的必须信息,例如ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。

2.ResourceManager启动一个container用于运行ApplicationMaster。 自己加的: Container开启一个ApplicationMaster进程

3.启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳。

4.ApplicationMaster向ResourceManager发送请求,申请相应数目的container。 以保证多个MapTask运行
5.ResourceManager返回ApplicationMaster的申请的containers信息。申请成功的container,由ApplicationMaster进行初始化。container的启动信息初始化后,AM与对应的NodeManager通信,要求NM启动container。AM与NM保持心跳,从而对NM上运行的任务进行监控和管理。
6.container运行期间,ApplicationMaster对container进行监控。container通过RPC协议向对应的AM汇报自己的进度和状态等信息。
7.应用运行期间,client直接与AM通信获取应用的状态、进度更新等信息。
8.应用运行结束后,ApplicationMaster向ResourceManager注销自己,并允许属于它的container被收回。

Yarn的调度:

FIFO调度

容量调度

  1. 公平调度:

YARN:Job提交流程

Hadoop HA:

HA:高可用,解决容灾问题

Mater/Slave主从架构的,都有单点故障问题

hadoopNode1 hadoopNode2 hadoopNode3 hadoopNode4 hadoopNode5
NameNode NN1 NN2
DataNode DN DN DN
ResourceManger RM RM
NodeManger NM NM NM NM NM
ZooKeeper ZK ZK ZK ZK ZK
ZKFailoverController ZKC ZKC
JournalNode JN JN JN JN JN
  1. 节点准备 3节点,5节点,10节点
    配置: IP地址,主机名,hosts

2.NN2节点到各节点(包含NN1)的免费登陆

ping hadoopNode1

ZooKeeper集群协调服务

zookeeper=文件系统+通知机制,达到一个集群协调管理

安装单节点ZooKeeper:
  1. 下载解压ZooKeeper
    1. [ambow@hadoopNode1 app]$ tar -xvzf zookeeper-3.4.6.tar.gz -C ~/app/

2.配置ZooKeeper环境变量
  1. [ambow@hadoopNode1 app]$ vi ~/.bash_profile
  1. ZOOKEEPER_HOME=/home/ambow/app/zookeeper-3.4.6
  2. JAVA_HOME=/home/ambow/app/jdk1.8.0_121
  3. HADOOP_HOME=/home/ambow/app/hadoop-2.7.3
  4. PATH=$PATH:$HOME/.local/bin:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin
  5. export PATH
  6. export JAVA_HOME
  7. export HADOOP_HOME
  8. export ZOOKEEPER_HOME
  1. [ambow@hadoopNode1 app]$ source ~/.bash_profile

3.修改ZooKeeper配置文件
  1. [ambow@hadoopNode1 app]$ cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg
  2. [ambow@hadoopNode1 app]$ vi $ZOOKEEPER/conf/zoo.cfg
  1. tickTime=2000 #通信心跳数,Zookeeper服务器心跳时间,单位毫秒
  2. initLimit=10 #Leader和Follower初始通信时限
  3. syncLimit=5 #Leader和Follower同步通信时限
  4. dataDir=/home/ambow/zkdata/data #自己指定zookeeper的data路径 如果没有时手动创建
  5. dataLogDir=/home/ambow/zkdata/log #自己指定zookeeper的log路径 如果没有时手动创建 log需要手动创建
  6. clientPort=2181 #zookeeper对外的端口

4.单节点测试ZooKeeper
功能
zkServer.sh start 启动zookeeper服务
zkServer.sh stop 停止zookeeper服务
zkServer.sh status 查看zookeeper服务状态
zkCli.sh 客户端连接
  1. [ambow@hadoopNode1 ~]$ zkServer.sh start #启动
  2. [ambow@hadoopNode1 ~]$ zkCli.sh

ZooKeeper集群安装:

1.前提,删除单点dataDir=/home/ambow/zkdata/data 中的数据

2.配置: ZooKeeper配置文件zoo.cfg

  1. tickTime=2000 #通信心跳数,Zookeeper服务器心跳时间,单位毫秒
  2. initLimit=10 #Leader和Follower初始通信时限
  3. syncLimit=5 #Leader和Follower同步通信时限
  4. dataDir=/home/ambow/zkdata/data #自己指定zookeeper的data路径 如果没有时手动创建
  5. dataLogDir=/home/ambow/zkdata/log #自己指定zookeeper的log路径 如果没有时手动创建
  6. clientPort=2181 #zookeeper对外的端口
  7. #指定server.1配zookeeper的各个节点 server.X=节点名:2888:3888
  8. server.1=hadoopNode1:2888:3888
  9. server.2=hadoopNode2:2888:3888
  10. server.3=hadoopNode3:2888:3888
  11. server.4=hadoopNode4:2888:3888
  12. server.5=hadoopNode5:2888:3888

3.分发软件,及环境配置到各节点

  1. [ambow@hadoopNode1 zkdata]$ scp -r ~/app/zookeeper-3.4.6 ambow@hadoopNode5:~/app/
  2. [ambow@hadoopNode1 zkdata]$ scp -r ~/app/zookeeper-3.4.6 ambow@hadoopNode2:~/app/
  3. [ambow@hadoopNode1 zkdata]$ scp -r ~/app/zookeeper-3.4.6 ambow@hadoopNode3:~/app/
  4. [ambow@hadoopNode1 zkdata]$ scp -r ~/app/zookeeper-3.4.6 ambow@hadoopNode4:~/app/
  5. [ambow@hadoopNode1 zkdata]$ scp -r ~/zkdata ambow@hadoopNode2:~/
  6. [ambow@hadoopNode1 zkdata]$ scp -r ~/zkdata ambow@hadoopNode3:~/
  7. [ambow@hadoopNode1 zkdata]$ scp -r ~/zkdata ambow@hadoopNode4:~/
  8. [ambow@hadoopNode1 zkdata]$ scp -r ~/zkdata ambow@hadoopNode5:~/
  9. [ambow@hadoopNode1 ~]$ scp ~/.bash_profile ambow@hadoopNode2:~
  10. [ambow@hadoopNode1 ~]$ scp ~/.bash_profile ambow@hadoopNode3:~
  11. [ambow@hadoopNode1 ~]$ scp ~/.bash_profile ambow@hadoopNode4:~
  12. [ambow@hadoopNode1 ~]$ scp ~/.bash_profile ambow@hadoopNode5:~
  13. [ambow@hadoopNode1 ~]$ source .bash_profile
  14. [ambow@hadoopNode2 ~]$ source .bash_profile
  15. [ambow@hadoopNode3 ~]$ source .bash_profile
  16. [ambow@hadoopNode4 ~]$ source .bash_profile
  17. [ambow@hadoopNode5 ~]$ source .bash_profile

注:每个节点的执行

4.增加myid

myid唯一服务标识

在指定的zk的data目录(dataDir=/home/ambow/zkdata/data)下创建一个myid的文件,每一个节点按照顺序分别添加内容:如:第一个节点添加1。第二人节点为2

server.1=master ———> echo 1 >> /home/ambow/zookeeper/myid

server.2=slave1: ———> echo 2 >> /home/ambow/zookeeper/myid

server.3=slave2: ———> echo 3 >> /home/ambow/zookeeper/myid

注:要与ZooKeeper配置文件zoo.cfg中配置的server.ID号一致

  1. [ambow@hadoopNode1 data]$echo 1 >> /home/ambow/zookeeper/myid
  2. [ambow@hadoopNode2 data]$echo 2 >> /home/ambow/zookeeper/myid
  3. [ambow@hadoopNode3 data]$echo 3 >> /home/ambow/zookeeper/myid
  4. [ambow@hadoopNode4 data]$echo 4 >> /home/ambow/zookeeper/myid
  5. [ambow@hadoopNode5 data]$echo 5 >> /home/ambow/zookeeper/myid
  6. [ambow@hadoopNode1 data]$ ssh hadoopNode5
  7. [ambow@hadoopNode5 ~]$ cd zkdata/data/
  8. [ambow@hadoopNode5 data]$ echo 5 >> myid
  1. 测试
    各节点 启动 zkServer.sh start —启动服务
    再用 zkSerer.sh status 查看leader,follower
    注:启动集群zokeeper时,要把单节点的zookeeper产生的文件删除掉(/home/ambow/zookeeper这目录下,myid文件保留)

    kill掉leader 看是否选举

HDFS HA: 2 NN

1.配置core-site.xml

  1. <configuration>
  2. <!-- 集群中命名服务列表,名称自定义 -->
  3. <property>
  4. <name>fs.defaultFS</name>
  5. <value>hdfs://clusterldg</value>
  6. </property>
  7. <!-- NameNode、DataNode、JournalNode等存放数据的公共目录。 会自动创建 不建议默认 -->
  8. <property>
  9. <name>hadoop.tmp.dir</name>
  10. <value>/home/wl/hdfs/data</value>
  11. </property>
  12. <!-- ZooKeeper集群的地址和端口。注意,数量一定是奇数,且不少于三个节点 -->
  13. <property>
  14. <name>ha.zookeeper.quorum</name>
  15. <value>master:2181,slave1:2181,slave2:2181</value>
  16. </property>
  17. </configuration>

2.hdfs-site.xml

  1. <configuration>
  2. <!-- 指定DataNode存储block的副本数量。默认值是3个 -->
  3. <property>
  4. <name>dfs.replication</name>
  5. <value>3</value>
  6. </property>
  7. <!-- 给hdfs集群起名字,这个名字必须和core-site中的统一,且下面也会用到该名字 -->
  8. <property>
  9. <name>dfs.nameservices</name>
  10. <value>clusterldg</value>
  11. </property>
  12. <!-- 指定NameService是cluster1时的namenode有哪些,这里的值也是逻辑名称,名字随便起,相互不重复即可 -->
  13. <property>
  14. <name>dfs.ha.namenodes.clusterldg</name>
  15. <value>nn1,nn2</value>
  16. </property>
  17. <!-- 指定RPC地址 -->
  18. <property>
  19. <name>dfs.namenode.rpc-address.clusterldg.nn1</name>
  20. <value>master:8020</value>
  21. </property>
  22. <property>
  23. <name>dfs.namenode.rpc-address.clusterldg.nn2</name>
  24. <value>slave1:8020</value>
  25. </property>
  26. <!-- 指定http地址 -->
  27. <property>
  28. <name>dfs.namenode.http-address.clusterldg.nn1</name>
  29. <value>master:50070</value>
  30. </property>
  31. <property>
  32. <name>dfs.namenode.http-address.clusterldg.nn2</name>
  33. <value>slave1:50070</value>
  34. </property>
  35. <!-- 指定cluster1是否启动自动故障恢复,即当NameNode出故障时,是否自动切换到另一台NameNode -->
  36. <property>
  37. <name>dfs.ha.automatic-failover.enabled.clusterldg</name>
  38. <value>true</value>
  39. </property>
  40. <!-- 指定cluster1的两个NameNode共享edits文件目录时,使用的JournalNode集群信息 -->
  41. <property>
  42. <name>dfs.namenode.shared.edits.dir</name>
  43. <value>qjournal://master:8485;slave1:8485;slave2:8485/clusterldg</value>
  44. </property>
  45. <!-- 指定cluster1出故障时,哪个实现类负责执行故障切换 -->
  46. <property>
  47. <name>dfs.client.failover.proxy.provider.clusterldg</name>
  48. <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  49. </property>
  50. <!-- 指定JournalNode集群在对NameNode的目录进行共享时,自己存储数据的磁盘路径。tmp路径是自己创建,journal是启动journalnode自动生成 -->
  51. <property>
  52. <name>dfs.journalnode.edits.dir</name>
  53. <value>/home/wl/zkdata/journal</value>
  54. </property>
  55. <!-- 一旦需要NameNode切换,使用ssh方式进行操作 -->
  56. <property>
  57. <name>dfs.ha.fencing.methods</name>
  58. <value>sshfence</value>
  59. </property>
  60. <!-- 使用ssh进行故障切换,所以需要配置无密码登录,使用ssh通信时用的密钥存储的位置 -->
  61. <property>
  62. <name>dfs.ha.fencing.ssh.private-key-files</name>
  63. <value>/home/wl/.ssh/id_rsa</value>
  64. </property>
  65. <property>
  66. <!-- 设置权限验证 -->
  67. <name>dfs.permissions</name>
  68. <value>false</value>
  69. </property>
  70. </configuration>

3.配置slaves文件

  1. hadoopNode1
  2. hadoopNode2
  3. hadoopNode3
  4. hadoopNode4
  5. hadoopNode5

4.配置文件分发至各节点

core-site.xml

hdfs-site.xml

slaves

5.zookeeper集群启动

各个节点执行: zkServer.sh start

注:如何启动失败,则把之前要把各个点的配置有dataDir=/home/ambow/zkdata/data目录删除数据,保留myid 文件

验证:leader,fllow

  1. 只在第一master个节点格式化格式化ZooKeeper

hdfs zkfc -formatZK

  1. [ambow@hadoopNode1 data]$ hdfs zkfc -formatZK

7.启动 JournalNode集群

各节个节点 执行:hadoop-daemon.sh start journalnode

  1. [ambow@hadoopNode1 data]$ hadoop-daemon.sh start journalnode
  2. [ambow@hadoopNode2 data]$ hadoop-daemon.sh start journalnode
  3. [ambow@hadoopNode3 data]$ hadoop-daemon.sh start journalnode
  4. [ambow@hadoopNode4 data]$ hadoop-daemon.sh start journalnode
  5. [ambow@hadoopNode5 data]$ hadoop-daemon.sh start journalnode

验证:各JournalNode的进程

8.格式化集群的第一个NN1节点(第一次启动需要,以后不需要再格式化了)

hdfs namenode -format

hadoop namenode -format

  1. [ambow@hadoopNode1 data]$ hdfs namenode -format

注:格式化时,要把各个节点的hdfs目录删掉

9.在NN1节点上启动namenode

hadoop-daemon.sh start namenode

  1. [ambow@hadoopNode1 data]$ hadoop-daemon.sh start namenode

验证:NameNode进程

10.同步NN1的元数据信息至NN2上 (第一次需要执行)

在NN2上执行 :hdfs namenode -bootstrapStandby

  1. [ambow@hadoopNode2 data]$ hdfs namenode -bootstrapStandby

11.启动NN2的namenode进程

hadoop-daemon.sh start namenode

验证:NameNode进程

12.启动各节点的dataNode

hadoop-daemon.sh start datanode

13.启动 ZooKeeperFailoverCotroller

在NN1,NN2 节点上各执行:

hadoop-daemon.sh start zkfc

14.测试验证 HDFS HA

注意:如果无法完成自动切换需要查看日志文件,查找失败原因。CentOS7最小化版本中需要安装psmisc。

原因:提示未找到fuster程序,导致无法进行fence,所以可以通过如下命令来安装,Psmisc软件包中包含了fuster程序:

$>sudo yum install psmisc

YARN HA:

hadoopNode1 hadoopNode2 hadoopNode3 hadoopNode4 hadoopNode5
NameNode NN1 NN2
DataNode DN DN DN
ResourceManger RM RM
NodeManger NM NM NM NM NM
ZooKeeper ZK ZK ZK ZK ZK
ZKFailoverController ZKC ZKC
JournalNode JN JN JN JN JN
  1. 节点准备 3节点,5节点,10节点

1.配置yran-site.xml

  1. <configuration>
  2. <!-- 开启RM高可用 -->
  3. <property>
  4. <name>yarn.resourcemanager.ha.enabled</name>
  5. <value>true</value>
  6. </property>
  7. <!-- 指定RM的cluster id -->
  8. <property>
  9. <name>yarn.resourcemanager.cluster-id</name>
  10. <value>yarn-cluster</value>
  11. </property>
  12. <!-- 指定RM的名字 -->
  13. <property>
  14. <name>yarn.resourcemanager.ha.rm-ids</name>
  15. <value>rm1,rm2</value>
  16. </property>
  17. <!-- 分别指定RM的地址 -->
  18. <property>
  19. <name>yarn.resourcemanager.hostname.rm1</name>
  20. <value>master</value>
  21. </property>
  22. <property>
  23. <name>yarn.resourcemanager.hostname.rm2</name>
  24. <value>slave1</value>
  25. </property>
  26. <property>
  27. <name>yarn.resourcemanager.webapp.address.rm1</name>
  28. <value>master:8088</value>
  29. </property>
  30. <property>
  31. <name>yarn.resourcemanager.webapp.address.rm2</name>
  32. <value>slave1:8088</value>
  33. </property>
  34. <!-- 指定zk集群地址 -->
  35. <property>
  36. <name>yarn.resourcemanager.zk-address</name>
  37. <value>master:2181,slave1:2181,slave2:2181</value>
  38. </property>
  39. <property>
  40. <name>yarn.nodemanager.aux-services</name>
  41. <value>mapreduce_shuffle</value>
  42. </property>
  43. <!-- 指定当前RM的ID 只添加在MR1 MR2节点上 可以不配置 -->
  44. <property>
  45. <name>yarn.resourcemanager.ha.id</name>
  46. <value>rm1</value>
  47. </property>
  48. <!-- 启用RM重启功能 -->
  49. <property>
  50. <name>yarn.resourcemanager.recovery.enabled</name>
  51. <value>true</value>
  52. </property>
  53. <!-- 启用自动恢复,当任务进行一半,rm坏掉,就要启动自动恢复,默认是false -->
  54. <property>
  55. <name>yarn.resourcemanager.recovery.enabled</name>
  56. <value>true</value>
  57. </property>
  58. <!-- 指定resourcemanager的状态信息存储在zookeeper集群,默认是存放在FileSystem里面。 -->
  59. <property>
  60. <name>yarn.resourcemanager.store.class</name>
  61. <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
  62. </property>
  63. </configuration>
  1. 配置mapered-site.xml
  1. <configuration>
  2. <property>
  3. <name>mapreduce.framework.name</name>
  4. <value>yarn</value>
  5. </property>
  6. </configuration>

3.分发

4.测试

  1. #启动RM1及所有NM
  2. start-yarn.sh
  3. #启动RM2
  4. yarn-daemon.sh start resourcemanager
  1. yarn rmadmin -getServiceState rm1
  2. yarn rmadmin -getServiceState rm2

HBase

HBase是一个分布式的、面向列的开源数据库
NOSQL的关系型数据库
存于HDFS文件系统

hadoop基础 - 图18

HMaster: HBase的主管理者,管理HRegionServer

HRegionServer:每台服务器的从管理者,主要负责响应用户的IO请求,向HDFS中读写数据

HRegion:一个HRegion保存一个表中一段连续的数据,一个表由多个HRegion组成

store: 存储数据, 分为两种:MemStore,StoreFile:,一个Store对应HBase表中的一个列族

MemStore: 最先存放数据地方,当数据满 时刷入StoreFile

StoreFile: 是HBase最小的存储单元,底层最终是以HFile形式存储在HDFS中

HFile:存HBase数据的地方,物理文件

HLong:日志,一个HDFS上的一个日志文件

HBase数据模型:

hadoop基础 - 图19

行关键字(row key):唯一标识表中的一行数据

列族:对表中的字段进行 分组

列关键字(Column Key): 列键 格式: “ 列族名:列名 ” 例 : “成绩:英语”

时间戳: 插入单元格时的是时间,以时间戳来区别该单元的版本号

存储单元(Cell): 行关键字+列关键字+时间戳 就能确认一个存储单元

HBase安装:

准备工作:

不同机器之间的时间同步 要求每个节点子在30秒

  1. [root@hadoopNode5 ~]# yum -y install ntp #安装ntp软件
  2. [root@hadoopNode5 ~]# ntpdate ntp1.aliyun.com //指定与啊里云时间同步服务

1)安装jdk(略)

2)安装hadoop(略)

3)下载hbase1.3.2(略)

4)tar包

  1. [ambow@hadoopNode1 ~]$ tar -xvzf ~/soft/hbase-1.3.2-bin.tar.gz -C ~/app/

5)配置环境变量~/ .base_profile

HBASE_HOME

PATH

6)hbase-env.sh 配置java_home和ZK

  1. [ambow@hadoopNode1 conf]$ vi $HBASE_HOME/conf/hbase-env.sh
  1. export JAVA_HOME=/home/wl/app/jdk1.8.0_121
  2. export HADOOP_HOME=/home/wl/app/hadoop-2.7.3
  3. export HBASE_MANAGES_ZK=false #禁用Hbase使用内置zookeper
  4. export HBASE_BACKUP_MASTERS=${HBASE_HOME}/conf/backup-masters #配置HA的第二个节HMaster节点

新建一个$HBASE_HOME/conf/backup-masters 文件

  1. vi $HBASE_HOME/conf/backup-masters

把备用的 HMaster节点添加:

  1. hadoopNode2

7)hbase-site.xml配置参数

  1. <configuration>
  2. <!-- #指定hbase在HDFS中目录 自动创建 -->
  3. <property>
  4. <name>hbase.rootdir</name>
  5. <value>hdfs://clusterldg/hbase</value>
  6. </property>
  7. <!-- #true时,为集群模式u -->
  8. <property>
  9. <name>hbase.cluster.distributed</name>
  10. <value>true</value>
  11. </property>
  12. <!-- #设置自己的zookeeper用的那个几个节点 -->
  13. <property>
  14. <name>hbase.zookeeper.quorum</name>
  15. <value>hadoopNode1,hadoopNode2,hadoopNode3,hadoopNode4,hadoopNode5</value>
  16. </property>
  17. <!-- #使用内置Zookeeper时要指定 -->
  18. <property>
  19. <name>hbase.zookeeper.property.dataDir</name>
  20. <value>/home/ambow/zkdata/hdata</value>
  21. </property>
  22. </configuration>

8)配置regionserver(配置每一个机器名 子节点名 不要配主节名)

在hbase/conf/下新建regionserver文件,添加如入内容

  1. hadoopNode3
  2. hadoopNode4
  3. hadoopNode5

9)scp -r hbase到其他节点

  1. [ambow@hadoopNode1 conf]$ scp -r ~/app/hbase-1.3.2 ambow@hadoopNode5:~/app/
  2. [ambow@hadoopNode1 conf]$ scp -r ~/app/hbase-1.3.2 ambow@hadoopNode4:~/app/
  3. [ambow@hadoopNode1 conf]$ scp -r ~/app/hbase-1.3.2 ambow@hadoopNode3:~/app/
  4. [ambow@hadoopNode1 conf]$ scp -r ~/app/hbase-1.3.2 ambow@hadoopNode2:~/app/
  5. [ambow@hadoopNode1 conf]$ scp ~/.bash_profile ambow@hadoopNode5:~
  6. [ambow@hadoopNode1 conf]$ scp ~/.bash_profile ambow@hadoopNode4:~
  7. [ambow@hadoopNode1 conf]$ scp ~/.bash_profile ambow@hadoopNode3:~
  8. [ambow@hadoopNode1 conf]$ scp ~/.bash_profile ambow@hadoopNode2:~

各节点重新加载 :source ~/.bash_profile

10)启动hdfs start-dfs.sh

11)启动各个节点的 ZK zkServer.sh start

  1. #zkServer.sh start

12)启动hbase

  1. # start-hbase.sh
  2. #stop-hbash.sh

13验证:

hadoop基础 - 图20

http://master: