- hadoop版本
- Hadoop的特点:
- Hadoop的核心组成:
- Hadoop安装前期准备
- .bash_profile
- Get the aliases and functions
- User specific environment and startup programs
- HDFS SHELL操作:
- HDFS API操作:
- MapReduce:
- Hadoop序列化:
- Split分片:
- 分区:
- Combiner————-
- MRUnit测试框架:
- MR排序:
- 二次排序:
- MR去重:
- MR—JOIN:
- YARN资源管理器:
- Hadoop HA:
- ZooKeeper集群安装:
- HDFS HA: 2 NN
- YARN HA:
- HBase
- HBase安装:
.
…大数据:巨量数据,海量数据
1byte= 8位
1024byte= 1KB
1024KB=1MB
1024MB=1GB
1024G=1TB
1024TB=1PB
1024PB=EB
1024EB=1ZB
大数据有4个特点:
为别为:Volume(大量)、Variety(多样)、Velocity(高速)、Value(价值),一般我们称之为4V。
Hadoop生态圈
hadoop版本
Apache 免费版
Cloudera 商业版
Hortonworks 雅虎的
Hadoop的特点:
1.高可靠性 数据副本
2.高扩展性 在集群间分配任务数据,可以方便扩展数以千计的节点
3.高效性 处理速度快 数据中心
4.高容错性
Hadoop的核心组成:
1)HDFS:一个高可靠、高吞吐量的分布式文件系统。
2)MapReduce:一个分布式的离线并行计算框架。
3)YARN:作业调度与集群资源管理的框架。
4)Hadoop Common:支持其他模块的工具模块(Configuration、RPC、序列化机制、日志操作)。
HDFS基本原理:
l 客户上传的文件首先被切分成等大的数据块,存储到多台机器上,负载均衡存储
l 将数据切分、容错、负载均衡等功能透明化,用户就像操作个容量巨大、具有高容错性的磁盘那样简单
HDFS基本架构:
1)NameNode(nn):存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间、副本数、文件权限),以及每个文件的块列表和块所在的DataNode等。
2)DataNode(dn):在本地文件系统存储文件块数据,以及块数据的校验和。
3)Secondary NameNode(2nn):用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。
Hadoop安装前期准备
- 安装虚拟机 Linux安装
- 配置网络地址(NAT)
[root@hadoopNode1 ~]# vi /etc/sysconfig/network-scripts/ifcfg-ens33
注:确定IP地址,网段
修改hostname配置
[root@hadoopNode1 ~]# vi /etc/hostname
注:主算机名使用英文字母组成, 设置好后就不能随意的修改
修改hosts映射配置
[root@hadoopNode1 ~]# vi /etc/hosts
- 关闭防火墙
关闭:
开机禁用: ```shell [root@hadoopNode1 ~]#systemctl stop firewalld
[root@hadoopNode1 ~]#systemctl disable firewalld
5. 重起操作系统生效
创建用户ambow,并创建密码ambow(略)
```shell
[root@hadoopNode1 ~]# useradd ambow
[root@hadoopNode1 ~]# passwd ambow
- 设置ambow用户具有root权限 sudo
使用root用户,修改 /etc/sudoers 文件,找到下面一行,在root下面添加一行,如下所示:
[ambow@ master soft]# vi /etc/sudoers
## Allow root to run any commands anywhere
root ALL=(ALL) ALL
ambow ALL=(ALL) ALL
修改完毕,现在可以用ambow帐号登录,然后用命令 su - ambow,即可获得root权限进行操作。
- 安装JDK
安装命令tree
sudo yum install -y treetar包: ```shell [ambow@hadoopNode1 ~]$ pwd /home/ambow [ambow@hadoopNode1 ~]$ mkdir soft [ambow@hadoopNode1 ~]$ mkdir app [ambow@hadoopNode1 ~]$ ls app soft [ambow@hadoopNode1 ~]$ tree . . ├── app └── soft ├── hadoop-2.6.0-cdh5.14.2.tar ├── jdk-8u121-linux-x64.tar.gz └── zookeeper-3.4.6.tar.gz
2 directories, 3 files [ambow@hadoopNode1 ~]$ pwd /home/ambow [ambow@hadoopNode1 ~]$ tar -zxvf ./soft/jdk-8u112-linux-x64.tar.gz -C ./app/
配置JDK:
```shell
[ambow@hadoopNode1 jdk1.8.0_112]$ vi ~/.bash_profile
[ambow@hadoopNode1 jdk1.8.0_112]$ cat ~/.bash_profile
# .bash_profile
# Get the aliases and functions
if [ -f ~/.bashrc ]; then
. ~/.bashrc
fi
# User specific environment and startup programs
JAVA_HOME=/home/ambow/app/jdk1.8.0_112
PATH=$PATH:$HOME/.local/bin:$HOME/bin:$JAVA_HOME/bin
export PATH
export JAVA_HOME
[ambow@hadoopNode1 jdk1.8.0_121]$
[ambow@hadoopNode1 jdk1.8.0_121]$ source ~/.bash_profile
source ~/.bash_profile 让配置文件生效
- 重启操作系统
rebootHadoop 三种模式安装
1.本地模式 用于开发和调式
2.伪分布式 模拟一个小规模的集群
一台主机模拟多主机
启动 NameNode DataNode ResouceManger,nodeManager
3.集群模式:(生产环境)
多台主机,分别充当NaameNode,DataNode 。。。。
Hadoop本地模式安装:
解压Hadoop软件
[ambow@hadoopNode1 sbin]$ tar -zxvf ~/soft/hadoop-2.6.0-cdh5.14.2.tar.gz -C ~/app/
配置Hadoop环境变量 ```shell [ambow@hadoopNode1 hadoop-2.7.3]$ vi ~/.bash_profile [ambow@hadoopNode1 hadoop-2.7.3]$ cat ~/.bash_profile
.bash_profile
Get the aliases and functions
if [ -f ~/.bashrc ]; then . ~/.bashrc fi
User specific environment and startup programs
JAVA_HOME=/home/ambow/app/jdk1.8.0_112
HADOOP_HOME=/home/ambow/app/hadoop-2.6.0-cdh5.14.2
PATH=$PATH:$HOME/.local/bin:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export PATH export JAVA_HOME export HADOOP_HOME
1. 环境变量生效
```shell
[ambow@hadoopNode1 hadoop-2.7.3]$ source ~/.bash_profile
测试
新建测试的数据文件: touch ~/mydata.txt
测试语法格式:[ambow@hadoopNode1 mydata.out]$ hadoop jar ~/app/hadoop-2.6.0-cdh5.14.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.14.2.jar wordcount ~/mydata.txt ~/mydata.out2
伪分布模式配置:
- JDK安装
- Hadoop安装
配置Hadoop的$HADOOP_HOME/etc/hadoop/core-site.xml
fs.defaultFS
hadoop.tmp.dir[ambow@hadoopNode1 hadoop]$ vi $HADOOP_HOME/etc/hadoop/core-site.xml
```xml
fs.defaultFS hdfs://hadoop1:8020 hadoop.tmp.dir /home/ambow/hdfs/data
3. 配置java_home 修改文件/home/ambow/app/hadoop-2.6.0-cdh5.14.2/etc/hadoop/hadoop-env.sh
export JAVA_HOME =/home/ambow/app/jdk1.8.0_112/
```
4. 配置 hdfs-site.xml<br />dfs.replication 设置块的副本个数: 伪分布模式只能设置为**1** 默认为3
```shell
[ambow@hadoopNode1 hadoop]$ vi $HADOOP_HOME/etc/hadoop/hdfs-site.xml
<configuration>
<property>
<!-- 配置每个block的副本个数 默认3个 当是单节点时配置为1 不能配置态多,态多反而降低效果 -->
<name>dfs.replication</name>
<!-- 伪分布式只能配1个副本 -->
<value>1</value>
</property>
</configuration>
- 格式化
[ambow@hadoopNode1 ~]$ hadoop namenode -format
一般只格式化一次,如果要再格式化,建议要把各dataNode节点的数据删除要,防止DataNode和NameNode的集群ID号不一致而无法启动
6.启动hadoop
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
hadoop-daemon.sh stop namenode
hadoop-daemon.sh stop datanode
7.查看进程
jps
- logs日志文件
cat ~/soft/hadoop/logs
9:WEB访问查看
http://192.168.100.100:50070/
YARN上运行MR 要配置两个配置文件
配置mapred-site.xml.template[ambow@hadoopNode1 hadoop]$ vi $HADOOP_HOME/etc/hadoop/mapred-site.xml.template
<configuration>
<property>
<!-- 指定MapReduce使用Yarn资源管理框架 -->
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
配置yarn-site.xml
yarn.resourcemanger.hostname
yarn.nodemaager.aux-service
<configuration>
<property>
<!-- 指定yaran主要管理一个机节点 主机名 -->
<name>yarn.resourcemanager.hostname</name>
<value>hadoop1</value>
</property>
<property>
<!-- 使用mapreduce_shuffle服务 -->
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
- 启动 yarn
[ambow@hadoopNode1 data]$ yarn-daemon.sh start resourcemanager
[ambow@hadoopNode1 data]$ yarn-daemon.sh start nodemanager
- 测试MR操作
上传Linux系统中的 ~/mydata.txt文件 至 HDFS文件系统/user/ambow目录中去
hadoop fs -mkdir -p /user/ambow
[ambow@hadoopNode1 data]$ hadoop fs -put ~/mydata.txt /user/ambow
对hdfs的文件使用yarn来进行wordcount操作:
hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.14.2.jar wordcount /user/ambow/mydata.txt /user/ambow/output/wc/
分布式集群安装:
1.修/etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.100.200 hadoop1
192.168.100.201 hadoop2
192.168.100.202 hadoop3
2.克隆虚拟机
3.分别配置各虚拟机节点的:IP地址,主机名,映射文件
[root@hadoopNode2 ~]# vi /etc/hostname
[root@hadoopNode2 ~]# vi /etc/sysconfig/network-scripts/ifcfg-ens33
[root@hadoopNode2 ~]# vi /etc/hosts
4.验证配置
[root@hadoopNode2 ~]# ping hadoop1
PING hadoopNode1 (192.168.100.200) 56(84) bytes of data.
64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=1 ttl=64 time=0.190 ms
64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=2 ttl=64 time=0.230 ms
64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=3 ttl=64 time=0.263 ms
64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=4 ttl=64 time=0.227 ms
^C64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=5 ttl=64 time=0.195 ms
64 bytes from hadoopNode1 (192.168.100.200): icmp_seq=6 ttl=64 time=0.268 ms
^C
--- hadoopNode1 ping statistics ---
6 packets transmitted, 6 received, 0% packet loss, time 5000ms
rtt min/avg/max/mdev = 0.190/0.228/0.268/0.035 ms
[root@hadoopNode2 ~]# ping hadoop2
PING hadoopNode2 (192.168.100.201) 56(84) bytes of data.
64 bytes from hadoopNode2 (192.168.100.201): icmp_seq=1 ttl=64 time=0.011 ms
64 bytes from hadoopNode2 (192.168.100.201): icmp_seq=2 ttl=64 time=0.022 ms
^C
--- hadoopNode2 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 999ms
rtt min/avg/max/mdev = 0.011/0.016/0.022/0.006 ms
[root@hadoopNode2 ~]# ping hadoop3
PING hadoopNode3 (192.168.100.202) 56(84) bytes of data.
64 bytes from hadoopNode3 (192.168.100.202): icmp_seq=1 ttl=64 time=0.246 ms
64 bytes from hadoopNode3 (192.168.100.202): icmp_seq=2 ttl=64 time=0.218 ms
64 bytes from hadoopNode3 (192.168.100.202): icmp_seq=3 ttl=64 time=0.218 ms
^C64 bytes from hadoopNode3 (192.168.100.202): icmp_seq=4 ttl=64 time=0.227 ms
^C
--- hadoopNode3 ping statistics ---
4 packets transmitted, 4 received, 0% packet loss, time 3001ms
rtt min/avg/max/mdev = 0.218/0.227/0.246/0.015 ms
Master—-slave主从架构
在master节点上进设置免密登陆
1).生成Maste节点公钥和私钥
ssh-keygen -t rsa
2)分发
ssh-copy-id hadoop1
ssh-copy-id hadoop2
ssh-copy-id hadoop3
3)验证 在Master节点上登陆各节点,看是否需要密码
ssh hadoop2
ssh hadoop3
6.配置核心文件 core-siter.xml
[ambow@hadoopNode1 hadoop]$ vi /home/ambow/app/hadoop-2.6.0-cdh5.14.2/etc/hadoop/core-site.xml
<configuration>
<!-- 配置默认FS hadoop3.X 默认端口为9820 hadoop2.X 默认端口为8020 hadoop1.X 默认端口为9000 一般伪分布设置为localhost:8020 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop1:8020</value>
</property>
<!-- 指定hadoop运行时产生文件存储的目录 会自动创建 不建议默认 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/home/ambow/hdfs/data</value>
</property>
</configuration>
- hdfs-site.xml
<configuration>
<property>
<!-- 配置每个block的副本个数 默认3个 当是单节点时配置为1 不能配置态多,态多反而降低效果 -->
<name>dfs.replication</name>
<!-- 伪分布式只能配1个副本 -->
<value>3</value>
</property>
<property>
<!-- 设置第辅助主节点 2NN -->
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop2:50090</value>
</property>
<property>
<!-- 检查点的路径 -->
<name>dfs.namenode.checkpoint.dir</name>
<value>/home/ambow/hdfs/namesecondary</value>
</property>
</configuration>
8.mapred-site.xml.template
<configuration>
<property>
<!-- 指定MapReduce使用Yarn资源管理框架 -->
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
- yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<!-- 指定yaran主要管理一个机节点 -->
<name>yarn.resourcemanager.hostname</name>
<value>hadoop1</value>
</property>
<property>
<!-- 使用mapreduce_shuffle服务 -->
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
10.修改slaves文件 来指定当前集群中那些节点是DataNode节点 把节点的主机名添加到slaves文件中
[ambow@hadoopNode1 hadoop]$ vi $HADOOP_HOME/etc/hadoop/slaves
hadoop1
hadoop2
hadoop3
11.分发文件到其他节点
注:分发之前要停止所有服务
关闭hdfs
网络复制:语法: scp -r 源文件目录 用户名@主机名:目标路径
-r 递归复制
[ambow@hadoopNode1 hadoop]$ scp -r $HADOOP_HOME/etc/hadoop ambow@hadoop2:$HADOOP_HOME/etc/
[ambow@hadoopNode1 hadoop]$ scp -r $HADOOP_HOME/etc/hadoop ambow@hadoop3:$HADOOP_HOME/etc/
12.测试
start-all.sh
stop-all.sh
[ambow@hadoopNode1 hadoop]$ start-all.sh
测试
查看进程jps
IP地址:8088
IP地址:50070
HDFS SHELL操作:
递归查看hdfs根目录
hadoop fs -ls -R /
查看hdfs目录
hadoop fs -ls /
查看指定的文件内容
hadoop fs -cat /user/ambow/mydata.txt
hadoop fs -text /user/ambow/mydata.txt
创建文件夹,-p用于时是否递归创建
hadoop fs -mkdir -p /user/ambow/my
从本地文件系统中复制单个或多个源路径到目标文件系统
hadoop fs -put data.txt /user/ambow/my
从标准输入中读取输入写入目标文件系统
hadoop fs -put - /user/ambow/
复制文件到本地文件
hadoop fs -get /user/ambow/mydata.txt data1.txt
删除指定的文件,只删除非空目录和文件
hadoop fs -rm /user/ambow/data.txt
递归删除,可以删除有文件的文件夹
hadoop fs -rm -r /user/ambow/my
删除空目录
hadoop fs -rmdir /user/ambow/data
将源目录中所有的文件连接到本地目标文件中
hadoop fs -getmerge /user/ambow/my data1.txt
将文件从源路径移动到目标路径,这个命令允许有多个源路径,此时目标路径必须是一个目录
hadoop fs -mv /user/ambow/data.txt /user/ambow/my
将文件从源路径复制到目标路径,这个命令允许有多个源路径,此时目标路径必须是一个目录
hadoop fs -cp /user/ambow/data.txt /user/ambow/my
显示目录中所有文件的大小
hadoop fs -du /user/ambow
显示文件大小
hadoop fs -du -s /user/ambow/mydata.txt
创建一个0字节的空文件
hadoop fs -touchz /user/ambow/file.txt
返回指定路径的统计信息
hadoop fs -stat /user/ambow
清空回收站
hadoop fs -expunge
统计文件夹数量、文件数量、文件总大小信息
hadoop fs -count /user/ambow
查看文件尾部内容,一般用于查看日志
hadoop fs -tail /user/ambow/files.COPYING
修改文件权限
hadoop fs -chmod 700 /user/ambow/data.txt
[ambow@hadoopNode1 soft]$ hadoop fs -help
Usage: hadoop fs [generic options]
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...] #显示文件内容
[-checksum <src> ...] #校验和
[-chgrp [-R] GROUP PATH...] # 与chown一样
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...] # 改变权限
[-chown [-R] [OWNER][:[GROUP]] PATH...] #改变文件组
[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>] #从本地复制HDFS 与put命令相同
[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] #复制到本地 与get命令相同
[-count [-q] [-h] <path> ...] #统计路径下的目录、文件和字节数
[-cp [-f] [-p | -p[topax]] <src> ... <dst>] hdfs之间复制
[-createSnapshot <snapshotDir> [<snapshotName>]] #在目录上创建快照
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]] #文件使用情况
[-du [-s] [-h] <path> ...] #查看文件或目录空间
[-expunge]
[-find <path> ... <expression> ...] #查找
[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>] #下载
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] <src> <localdst>]
[-help [cmd ...]]
[-ls [-d] [-h] [-R] [<path> ...]] #查看
[-mkdir [-p] <path> ...] #创建
[-moveFromLocal <localsrc> ... <dst>] #从本地移到到HDFS
[-moveToLocal <src> <localdst>] HDFS移到本地
[-mv <src> ... <dst>] #HDFS之间移动
[-put [-f] [-p] [-l] <localsrc> ... <dst>] #上传
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] <src> ...] #删除
[-rmdir [--ignore-fail-on-non-empty] <dir> ...] 3删除空目录
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
[-setfattr {-n name [-v value] | -x name} <path>]
[-setrep [-R] [-w] <rep> <path> ...]
[-stat [format] <path> ...]
[-tail [-f] <file>]
[-test -[defsz] <path>]
[-text [-ignoreCrc] <src> ...] #查看文件内容
[-touchz <path> ...] #创建空文件
[-truncate [-w] <length> <path> ...]
[-usage [cmd ...]]
问题:启动DataNOde节点 有些节点无法正常启动
1.删除各节点hadoop.tmp.dir所指定的路径
2.重新格式化nameNoden
安全模式:检查的最小副本数 默认为1
HDFS API操作:
1.准备环境
Eclipse+Maven
Eclipse+Hadoopr的jar
hadoop的jar包:
1.解压hadoop-2.7.3.tar.gz到非中文目录
2.删除souce 和 test的jar包
创建目录 mkdirs
创建文件 create
查看目录或文件 liststats
上传文件:copyFromLocalFile
下载文件 :copyToLocalFile
复制文件 HDSF—>HDFS : ???
删除文件:delete
追加数据:append
读取一个文件 open()
改名:rename
查看块信息: fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
/* 创建目录
*/
public void mkdir() throws IOException, InterruptedException, URISyntaxException {
// 得到一个配置文件
Configuration conifg = new Configuration();
// 得到一个文件系统
FileSystem fs = FileSystem.get(new URI("hdfs://hadoopNode1:8020"), conifg, "ambow");
boolean isMkdir = fs.mkdirs(new Path("/hadfsapi/ambow2")); // 级联创建一个/user/ambow目录
System.out.println(isMkdir);
}
/* 查看块信息
*/
@Test
public void blockeInfo() throws IllegalArgumentException, IOException {
FileStatus fileStatus = fs.getFileStatus(new Path("/user/ambow/a.txt"));
BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation b : fileBlockLocations) {
System.out.println(b.getHosts());
for (String host : b.getHosts())
System.out.println(host);
}
}
Maven安装 :
1.下载maven包
2.解压
3.配置Maven的环境变量
添加一个MAVEN_HOME环境变量值为:Maven的解压路径
MAVEN_HOME= D:\apache-maven-3.5.2
在PATH环境变量中添加:%MAVEN_HOME%\bin
4、.配置本地Maven 库
%MAVEN_HOME%\repository 为本地Maven库 4.5G大小,要拷贝
配置%MAVEN_HOME%\conf\settings.xml中指定本地Maven库的路径
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<!-- 配置指向自己本地Maven库路径 -->
<localRepository>D:\apache-maven-3.5.2\repository</localRepository>
<profiles>
<profile>
<id>jdk-1.8</id>
<!-- 另外一种激活方式 -->
<activation>
<activeByDefault>true</activeByDefault>
<jdk>1.8</jdk>
</activation>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
</properties>
</profile>
<!--与activeProfies对应-->
<profile>
<id>nexusAliyun</id>
<repositories>
<repository>
<id>nexusAliyun</id>
<name>Repository of central</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<layout>default</layout>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
</repositories>
</profile>
</profiles>
<activeProfiles>
<activeProfile>nexusAliyun</activeProfile>
</activeProfiles>
</settings>
Eclispe集成Maven插件
1.windows—>preferences—->Maven—->installations—->add
- windows—>preferences—->Maven—->User Settings—->指定用户的Maven配置文件
D:\apache-maven-3.5.2\conf\settings.xml
3.创建Maven项目
问题:
hdfs-siter.xml
HDFS 架构: Mster/Savle
1)Client:就是客户端。
(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行存储。
(2)与NameNode交互,获取文件的位置信息。
(3)与DataNode交互,读取或者写入数据。
(4)Client提供一些命令来管理HDFS,比如启动或者关闭HDFS。
(5)Client可以通过一些命令来访问HDFS。
2)NameNode:就是master,它是一个主管、管理者。
(1)管理HDFS的名称空间。
(2)管理数据块(Block)映射信息
(3)配置副本策略
(4)处理客户端读写请求。
3)DataNode:就是Slave。NameNode下达命令,DataNode执行实际的操作。
(1)存储实际的数据块。
(2)执行数据块的读/写操作。
4)Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能马上替换NameNode并提供服务。
(1)辅助NameNode,分担其工作量。
(2)定期合并Fsimage和Edits-生成-à新的Fsimage,并推送给NameNode。
(3)在紧急情况下,可辅助恢复NameNode。
注:fsimage保存了最新的元数据检查点,在HDFS启动时加载fsimage的信息,包含了整个HDFS文件系统的所有目录和文件的信息。
对于文件来说包括了数据块描述信息、修改时间、访问时间等;对于目录来说包括修改时间、访问权限控制信息(目录所属用户,所在组)等。
editlog主要是在NameNode已经启动情况下对HDFS进行的各种更新操作进行记录,HDFS客户端执行所有的写操作都会被记录到editlog中。
HDFS读流程:
读流程如下:
我们先说一个语义:下载回这个文件。换句话说就是取回这个文件的所有的块,那么当有能力取回文件的所有块的时候,那么它的子集操作就是取回其中某些块或者某个块也能实现。所以我们先来看取回文件的所有块的流程是怎么实现的:
1、客户端和NameNode建立连接,获取文件block的位置信息(fileBlockLocations)
2、客户端根据自己想要获取的数据位置挑选需要连接的DataNode(如果全文下载,从0开始;如果是从某一位置开始,客户端需要给出)
需要用inputstream.seek(long)//从什么位置开始读取,和哪个DataNode开始连接获取block;
3、距离的概念:只有文件系统在读流程中附加距离优先的概念,计算层才能够被动实现计算向数据移动,距离有以下三种:
(1)本地,最近的距离;
(2)同机架,次之的距离;
(3)other(数据中心),最远的距离;
4、客户端下载完成block后会验证DataNode中的MD5,保证块数据的完整性。
HDFS写流程:
写流程如下:
1、客户端访问NameNode,NameNode检查路径和权限,如果路径中有与要上传的文件重名的文件就不能上传了,不能覆盖,如果没有才创建,创建名为file.copying的临时文件;
2、NameNode触发副本放置策略,如果客户端在集群内的某一台机器,那么副本第一块放置在该服务器上,然后再另外挑两台服务器;如果在集群外,namenode会根据策略先找一个机架选出一个datanode,然后再从另外的机架选出另外两个datanode,然后namenode会将选出的三个datanode按距离组建一个顺序,然后将顺序返回给客户端;
3、客户端会根据返回的三个节点和第一个节点建立一个socket连接(只会和第一个节点建立),第一个节点又会和第二个节点建立socket连接,由第二个节点又会和第三个节点建立一个socket连接,这种连接的方式叫Pipeline;
4、客户端会将block切分成package(默认是64kB),以流式在pipeline中传输
好处:
(1)速度快:时间线重叠(其实流式也是一种变异的并行);
(2)客户端简单:副本的概念是透明的;
5、由DataNode完成接收block块后,block的metadata(MD5校验用)通过一个心跳将信息汇报给NameNode;
6、如果再pipeline传输中,任意节点失败,上游节点直接连接失败节点的下游节点继续传输,最终在第5步汇报后,NameNode会发现副本数不足,一定会出发DataNode复制更多副本,客户端Client副本透明;
7、client一直重复以上操作,逐一将block块上传,同时DataNode汇报block的位置信息,时间线重叠;
8、最终,如果NameNode收到了DataNode汇报的所有块的信息,将文件的.copying去掉,文件可用。
HDFS删除流程:
1.client 执行 hadoop fs -rm /abc.txt
2.会执行DistributedFileSystem类的delete方法,这个方法会接收用户传来的文件路径 /abc.txt=》找namenode,执行delete方法(源码注释:Delete the given file or directory from the file system.)=》这里注意,namenode的删除操作,只是把这个路径信息从内存里元数据信息里删除,但是真正的数据的block块并没有立即删除。但是这并不影响,因为如果元数据里没有此文件信息,拿也拿不到。
3.如果想真正删干净对应的文件数据,得通过DataNode来删除。DataNode会定期的给NameNode发送心跳包,如果是删除,NameNode在接收到DataNode心跳包后,会把删除的指令包括删除哪些文件数据传达给DataNode,DataNode接到指令后,执行删除指令,把block块从本机上删除。
复本放置策略:
MapReduce:
编写MapRdeuce计算框架:
- 在Eclipse下新建一个项目工程
- 导入包
- 创建自己的WcMapper类,继承org.apache.hadoop.mapreduce.Mapper,并重写map()方法
```
public class MyMapper extends Mapper
{ protected void map(KEYIN key1, VALUEIN value1, Context context) {
}
}
```
KEYIN: 输入的key
VALUEIN:输入的value
KEYOUT:输出的key
VALUEOUT:输出的value
Context:Mapper的上下文
```java
// 创建Mapper类 Mapper<输入的Long,输入字符串, 输出字符串, 输出的Long>
public static class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/*
* KeyIn:LongWritable 行的偏移量 ValueIn:Text 这一行的值 TextInputformat
*
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 得到每一行的值,反序化为字符串
String lines = value.toString();
// 对每一行的字符串按空格来拆分
String[] words = lines.split(" ");
for (String word : words) {
// 对每个单词写入Hadoop中 写入的数据必须是Hadoop的序列化
context.write(new Text(word), new LongWritable(1));
// hello:1 word:1 aaaa:1 空格 :1 空格 :1 空格 :1
}
}
}
创建一个WcReduce类,继承org.apache.hadoop.mapreduce.Reducer,并重写reduce()方法
public class MyReducer extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
{
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context)
{
}
}
KEYIN: 输入的key
VALUEIN:输入的value
KEYOUT:输出的key
VALUEOUT:输出的value
Context:Reducer的上下文public static class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
// reduce(单词key, 指定的单词mapper统计的List, Context context)
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
int total = 0;
for (LongWritable once : values) {
total += once.get();
}
context.write(key, new LongWritable(total));
}
}
创建一个驱动类WcDrive来配置job任务,并提交job任务
public class WcDrive {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//
Configuration conf = new Configuration();
// conf.set("HADOOP_USER_NAME","ambow");
// Job对像
Job job = Job.getInstance(conf);
// 注册Jar驱动类
job.setJarByClass(WcDrive.class);
// 注册Mapper驱动类
job.setMapperClass(WcMapper.class);
//注册Reducer驱动类
job.setReducerClass(WcReduce.class);
// 设置MapOutPut输出的的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置最终输出的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置输入输出路径
// org.apache.hadoop.mapred.FileInputFormat 老版本
// org.apache.hadoop.mapreduce.lib.input.FileInputFormat 新版本
FileInputFormat.setInputPaths(job, new Path("/user/ambow/mydata.txt"));
FileOutputFormat.setOutputPath(job, new Path("/hadfsapi/out/wc8"));
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置reduce任务数为0 分区多少个???
// job.setNumReduceTasks(0);
// 提交作业
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
} ```
6.本地模式测试:
mapred-site.xml 把 yran模式设置为local模式
7.集群测试:
把项目打包成jar包,上传至master节点
```shell<br />[ambow[@hadoopNode1 ](/hadoopNode1 ) data]$ hadoop jar wc.jar com.ldg.teacher.WcDrive /user/ambow/mydata.txt /usr/ambow/out/wc9
问题:
1.写入权限问题:
2.windows环境变量问题
3.log4j问题
MapReduce计算框架的缺点:
1.不适合实时计算
2.不适合流式计算
3.不适合DAG(有向无环图)计算
MR计算思想:
把一个文件切成很多片,第一个片交由一个mapper来计算,计算的结果,交由一个或多个Reducer来计算,最后输到到文件系统
MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中
Mapper阶段:是由一个或多个mapper任务并行执行
Reducer阶段:是由一个或多个Reduce任务并行执行
Hadoop序列化:
因为hadoop集群,很多的通信都是在网上传输:为了解决提高通信效率,所以重写java的重量级序列化,方便hadoop节点之间的通信
1.常用的数据类型对应的hadoop数据序列化:
Java类型 | HadoopWritable类型 |
---|---|
boolean | BooleanWritable |
byte | ByteWritable |
int | IntWritable |
float | FloatWritable |
long | LongWritable |
double | DoubleWritable |
string | Text |
map | MapWritable |
array | ArrayWritable |
2自定义Hdoop序列化:
1.实现WritableComparable接口
2.实现write()方法:将对像转换为字节流并写入到输出流out中
3.实现**ReadFields()**方法:用于从输入流in中读取字节流并反序列化对象
属性类型 | 序列化write操作 | 反序列化readFields操作 |
---|---|---|
属性是hadoop类型 | name.write(out) | name.readFields(in) |
属性是java类型 | out.write(name) | in.readFields(name) |
4.实现comparaTo()方法
5.属性可以使用 Hadoop的类型,也可 使用java的类型
6.实现各属性的set方法
7.实现构造方法
8.实现toString(),hasCode(),equals()方法
Split分片:
注:1.分片个数,决定 Mapper任务个数
2.分片是**逻辑分片**,只记住起始位置、长度以及所在的节点列表等
- 分片只对每一个文件单独切片 ,有多个小于block块大小的文件时,就产生多少个分片 ——?可以把小文件合并成一个大文件来处理
分片计算公式:
$$ Math.max(minSize,Math.min(maxSize,blocksize))) $$
分片源码:
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
分区:
把mapper计算出来结果,对key按一定的规则进行分区
如 :wordCount类: 按字母分一类( a—z A—-Z),非字母分一个类 要分2个区
上海,北京,重庆
1.编写自定义分区类,继承org.apache.hadoop.mapreduce.Partitioner类,重写getPartition()方法
通过返回一个int值来进行划分分区
//自定义分区类 map()函数的结果拿来分区 ( K,V)与map输出的(K,V)一致
public static class MyPartitioner extends Partitioner<Text, IntWritable> {
//转发给4个不同的reducer
@Override
public int getPartition(Text key, IntWritable value, int numPartitons) {
if (key.toString().equals("xiaomi"))
return 0;
if (key.toString().equals("huawei"))
return 1;
if (key.toString().equals("iphone7"))
return 2;
return 3;
}
}
2.job中设置分区
//设置Partitioner
job.setPartitionerClass(MyPartitioner.class);
//设置4个reducer,每个分区一个 设置为0表是没有分区也就是没有Reduce
job.setNumReduceTasks(4);
/**
* 自定义Partitoner在MapReduce中的应用
* 品牌 销售量 日期
*xiaomi 22 2019.8.22
*huawei 34 2019.8.22
*iphone7 1 2019.8.22
*huawei 100 2019.8.20
*huawei 299 2019.8.21
*
*分别统计各个品牌的销售量
*
*/
public class PartitionerApp {
private static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] s = value.toString().split("\t");
context.write(new Text(s[0]), new IntWritable(Integer.parseInt(s[1])));
//map输出结果:
/*xiaomi 22
*huawei 34
*iphone7 1
*huawei 100
*huawei 299
*
*10000000条
*
*/
}
}
//分区结果
/*xiaomi 22
*huawei 100
*huawei 299
*
*10000000条
*
*/
/*xiaomi 22
*xiaomi 221
*
*/
/*iphone7 1
*iphone7 3
*iphone7 3
*/
/*
* jingli 43
* voio 33
*
*/
//开启4个MyReduce类,分别来对每个区进行处理
private static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> value, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : value) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
//自定义分区类 map()函数的结果拿来分区 ( K,V)与map输出的(K,V)一致
public static class MyPartitioner extends Partitioner<Text, IntWritable> {
//转发给4个不同的reducer
@Override
public int getPartition(Text key, IntWritable value, int numPartitons) {
if (key.toString().equals("xiaomi"))
return 0;
if (key.toString().equals("huawei"))
return 1;
if (key.toString().equals("iphone7"))
return 2;
return 3;
}
}
// driver
public static void main(String[] args) throws Exception {
String INPUT_PATH = "hdfs://hadoopNode1:8020/user/ambow/sal_data.txt";
String OUTPUT_PATH = "hdfs://hadoopNode1:8020/user/ambow/out/partitioner";
Configuration conf = new Configuration();
final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
if (fileSystem.exists(new Path(OUTPUT_PATH))) {
fileSystem.delete(new Path(OUTPUT_PATH), true);
}
Job job = Job.getInstance(conf, "PartitionerApp");
// run jar class
job.setJarByClass(PartitionerApp.class);
// 设置map
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置reduce
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置Partitioner
job.setPartitionerClass(MyPartitioner.class);
//设置4个reducer,每个分区一个
job.setNumReduceTasks(4);
// input formart
job.setInputFormatClass(TextInputFormat.class); //默认类型
Path inputPath = new Path(INPUT_PATH);
FileInputFormat.addInputPath(job, inputPath);
// output format
job.setOutputFormatClass(TextOutputFormat.class); // 默认
Path outputPath = new Path(OUTPUT_PATH);
FileOutputFormat.setOutputPath(job, outputPath);
// 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Combiner————-
Map端的Reduce(局部的规约处理 替Reduce与先做一些事) 数据的合并
注:combiner有些业务是不适合处理的,如求平均值
分两个阶段:一个是map溢写阶段,一个是Reduce合并阶段
自定义Combiner类,继承Reduce类,重写的reduce ()方法
public static class MyCombiner
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
要求Combiner的reduce方法输入输出类型必须一致
Reducer- 在job设置
job.setCombinerClass(MyCombiner.class);
注:当reduce()中的输入输出K,V数据类型相同时,可以把Reduce类当成是Combiner类
例:
/**
* WordCount中使用Combiner
*
* 单词统计
*
*/
public class WordCountCombinerApp {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountCombinerApp.class);
job.setMapperClass(TokenizerMapper.class);
// 通过job设置Combiner处理类,其实逻辑就可以直接使用Reducer
job.setCombinerClass(MyCombiner.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/user/ambow/mydata.txt"));
FileOutputFormat.setOutputPath(job, new Path("/user/ambow/out/wc10"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Mapper与Reducer的键值关系:
Mapper:
(K1,V1)——>List( K2,V2)
Reducer:
(K2,List(V2))——>List(K3,V3)
MapReduce的详细执行过程:
基本流程:
1,大数据经split划分成大小相等的数据块(数据块的大小一般等于HDFS一个块的大小)以及用户作业程序。
2,系统中有一个负责调度的Master节点和许多的Map工作节点,Reduce工作节点
3,用户作业程序提交给Master节点,Master节点寻找合适的Map节点,并将数据传给Map节点,并且Master也寻找合适的Reduce节点并将数据传给Reduce节点
4,Master节点启动Map节点执行程序,Map节点尽可能的读取本地或本机架上的数据块进行计算。(数据本地化是Mapreduce的核心特征)
5,每个Map节点处理读取的数据块,并做一些数据整理,并且将中间结果放在本地而非HDFS中,同时通知Master节点Map工作完成,并告知中间结果的存储位置。
6,Master节点等所有Map工作完成后,开始启动Reduce节点,Reduce节点通过Master节点掌握的中间结果的存储位置来远程读取中间结果。
7,Reduce节点将中间结果处理后将结果输出到一个文件中。
从用户作业程序角度来看:
一个作业执行过程中有一个Jobtracker和多个Tasktracker,分别对应于HDFS中的namenode和datanode。Jobclient在用户端把已配置参数打包成jar文件存储在HDFS,并把存储路径提交给Jobtracker,然后Jobtracker创建每一个Task,并且分发到Tasktracker服务中去执行。
快速排序:
作业:编写MR程序
1.对温度进行排序输出
2.找出最低温度那一天
mapper——-> 只输出一个key,value( 34,66,78,) reduce: max( values )
MRUnit测试框架:
- 引入MRUnit测试框架: pom.xml:
<!-- https://mvnrepository.com/artifact/org.apache.mrunit/mrunit MRUnit测试 -->
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>0.9.0-incubating</version>
<classifier>hadoop2</classifier>
<scope>test</scope>
</dependency>
2.编写测试类
public class TestReducer {
// 用于测试Reduce
@Test
public void TestMethod() {
ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver = new ReduceDriver<Text, IntWritable, Text, IntWritable>();
//设置要测试对像
reduceDriver.withReducer(new MaxReduce());
// 设置测试的key=1950
reduceDriver.withInputKey(new Text("1950"));
// 设置测试的values=[ 10,13,0]
reduceDriver.withInputValues(Arrays.asList(new IntWritable(10), new IntWritable(13), new IntWritable(0)));
//设置预期输出结果
reduceDriver.withOutput(new Text("1950"), new IntWritable(134));
//执行测试
reduceDriver.runTest();
}
}
class MaxReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
MR排序:
1.默认MapReduce是根据键来进行排序 使用WritableComparable类的comparator()方法
2.job.setSortComparatorClass(cls); 设置排序类
部分排序:每个Reduce输出的文件各自排好序的,但不是全局有序
全局排序:
1.只生成一个分区,也就是只一个Reduce,但对大文件没使用并行架构
2.按数学区间段来进行分区,每个分区排序后组成一个全局排好序的文件
分区 | 1 | 2 | 3 | 4 |
---|---|---|---|---|
温度 | 温度<-10 | -10<温度<0 | 0<温度<10 | 10>温度 |
记录占比 | 11% | 13% | 17% | 59% |
全局排序问题:
数据倾斜问题 记录没有均匀划分
使用Hadoop 提供的采样器分区解决:
// 设置分区类使用Hadoop自带的TotalOrderPartitioner类
job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.class);
// 随机采集器类:采样率:0.1 最大采样数 :10000 最大分区数:10
RandomSampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
// 提供的取样器为给定的job作业编写分区文件
InputSampler.writePartitionFile(job, sampler);
job.setNumReduceTasks(10);
二次排序:
以claas相同时再以socre排序
* id class socre
* 1 1 20
* 2 1 30
* 3 1 50
* 4 2 44
* 5 2 55
/**
* 在分组比较的时候,只比较原来的key,而不是组合key。
*/
public static class GroupingComparator implements RawComparator<IntPair> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8);
}
@Override
public int compare(IntPair o1, IntPair o2) {
int first1 = o1.getFirst();
int first2 = o2.getFirst();
return first1 - first2;
}
}
MR去重:
以一行数据相同为重复
一个行某几列数据同为重复
用户id 商品id 收藏日期
- 10181 1000481 2010-04-04 16:54:31
- 20001 1001597 2010-04-07 15:07:52
- 20001 1001560 2010-04-07 15:08:27
- 20042 1001368 2010-04-08 08:20:30
- 20067 1002061 2010-04-08 16:45:33
- 20056 1003289 2010-04-12 10:50:55
- 20056 1003290 2010-04-12 11:57:35
- 20056 1003292 2010-04-12 12:05:29
- 20054 1002420 2010-04-14 15:24:12
- 20055 1001679 2010-04-14 19:46:04
- 20054 1010675 2010-04-14 15:23:53
- 20054 1002429 2010-04-14 17:52:45
- 20076 1002427 2010-04-14 19:35:39
- 20054 1003326 2010-04-20 12:54:44
- 20056 1002420 2010-04-15 11:24:49
- 20064 1002422 2010-04-15 11:35:54
- 20056 1003066 2010-04-15 11:43:01
- 20056 1003055 2010-04-15 11:43:06
- 20056 1010183 2010-04-15 11:45:24
- 20056 1002422 2010-04-15 11:45:49
- 20056 1003100 2010-04-15 11:45:54
- 20056 1003094 2010-04-15 11:45:57
- 20056 1003064 2010-04-15 11:46:04
- 20056 1010178 2010-04-15 16:15:20
- 20076 1003101 2010-04-15 16:37:27
- 20076 1003103 2010-04-15 16:37:05
- 20076 1003100 2010-04-15 16:37:18
- 20076 1003066 2010-04-15 16:37:31
- 20054 1003103 2010-04-15 16:40:14
- 20054 1003100 2010-04-15 16:40:16
实现原理:把表示重复字段使用Mapper阶段的Key表示,得上Reduce的归约,把相同的key作为一个输出
MR—JOIN:
两个文件进行关联
[ambow@hadoopNode1 data]$ cat table1.txt
1 zhangsan
2 lisi
3 wangwu
4 Xia
[ambow@hadoopNode1 data]$ cat table2.txt
math 1 33
java 3 45
test 2 55
C 2 55
python 2 55
word 3 55
oracle 1 22
[ambow@hadoopNode1 data]$ hadoop fs -cat /user/ambow/out/mapjoin/part-r-00000
1 zhangsan oracle 22
1 zhangsan math 33
2 lisi python 55
2 lisi C 55
2 lisi test 55
3 wangwu word 55
3 wangwu java 45
[ambow@hadoopNode1 data]$
Reduce端的JOIN
实现原理:map端打上标记(是那个表的数据),以关联的字段作为key,Rduce端进行关联处理
Mapper端的JOIN
当两个表中,基中一个表的数据比较少的情况下可以Mapper端的JOIN
实现原理:把小的一个表,直接到内存中(存入到集合),map端只读取大表,每读取一行直接就行关联
YARN资源管理器:
RM:集群全局资源管理器
NM:节点资源管理器
AM:对应一个JOB的ApplicationMaster
1.client向RM提交应用程序,其中包括启动该应用的ApplicationMaster的必须信息,例如ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
2.ResourceManager启动一个container用于运行ApplicationMaster。 自己加的: Container开启一个ApplicationMaster进程
3.启动中的ApplicationMaster向ResourceManager注册自己,启动成功后与RM保持心跳。
4.ApplicationMaster向ResourceManager发送请求,申请相应数目的container。 以保证多个MapTask运行
5.ResourceManager返回ApplicationMaster的申请的containers信息。申请成功的container,由ApplicationMaster进行初始化。container的启动信息初始化后,AM与对应的NodeManager通信,要求NM启动container。AM与NM保持心跳,从而对NM上运行的任务进行监控和管理。
6.container运行期间,ApplicationMaster对container进行监控。container通过RPC协议向对应的AM汇报自己的进度和状态等信息。
7.应用运行期间,client直接与AM通信获取应用的状态、进度更新等信息。
8.应用运行结束后,ApplicationMaster向ResourceManager注销自己,并允许属于它的container被收回。
Yarn的调度:
FIFO调度
容量调度
公平调度:
YARN:Job提交流程
Hadoop HA:
HA:高可用,解决容灾问题
Mater/Slave主从架构的,都有单点故障问题
hadoopNode1 | hadoopNode2 | hadoopNode3 | hadoopNode4 | hadoopNode5 | |
---|---|---|---|---|---|
NameNode | NN1 | NN2 | |||
DataNode | DN | DN | DN | ||
ResourceManger | RM | RM | |||
NodeManger | NM | NM | NM | NM | NM |
ZooKeeper | ZK | ZK | ZK | ZK | ZK |
ZKFailoverController | ZKC | ZKC | |||
JournalNode | JN | JN | JN | JN | JN |
- 节点准备 3节点,5节点,10节点
配置: IP地址,主机名,hosts
2.NN2节点到各节点(包含NN1)的免费登陆
ping hadoopNode1
ZooKeeper集群协调服务
zookeeper=文件系统+通知机制,达到一个集群协调管理
安装单节点ZooKeeper:
- 下载解压ZooKeeper
[ambow@hadoopNode1 app]$ tar -xvzf zookeeper-3.4.6.tar.gz -C ~/app/
2.配置ZooKeeper环境变量
[ambow@hadoopNode1 app]$ vi ~/.bash_profile
ZOOKEEPER_HOME=/home/ambow/app/zookeeper-3.4.6
JAVA_HOME=/home/ambow/app/jdk1.8.0_121
HADOOP_HOME=/home/ambow/app/hadoop-2.7.3
PATH=$PATH:$HOME/.local/bin:$HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$ZOOKEEPER_HOME/bin
export PATH
export JAVA_HOME
export HADOOP_HOME
export ZOOKEEPER_HOME
[ambow@hadoopNode1 app]$ source ~/.bash_profile
3.修改ZooKeeper配置文件
[ambow@hadoopNode1 app]$ cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg
[ambow@hadoopNode1 app]$ vi $ZOOKEEPER/conf/zoo.cfg
tickTime=2000 #通信心跳数,Zookeeper服务器心跳时间,单位毫秒
initLimit=10 #Leader和Follower初始通信时限
syncLimit=5 #Leader和Follower同步通信时限
dataDir=/home/ambow/zkdata/data #自己指定zookeeper的data路径 如果没有时手动创建
dataLogDir=/home/ambow/zkdata/log #自己指定zookeeper的log路径 如果没有时手动创建 log需要手动创建
clientPort=2181 #zookeeper对外的端口
4.单节点测试ZooKeeper
功能 | ||
---|---|---|
zkServer.sh start | 启动zookeeper服务 | |
zkServer.sh stop | 停止zookeeper服务 | |
zkServer.sh status | 查看zookeeper服务状态 | |
zkCli.sh | 客户端连接 |
[ambow@hadoopNode1 ~]$ zkServer.sh start #启动
[ambow@hadoopNode1 ~]$ zkCli.sh
ZooKeeper集群安装:
1.前提,删除单点dataDir=/home/ambow/zkdata/data 中的数据
2.配置: ZooKeeper配置文件zoo.cfg
tickTime=2000 #通信心跳数,Zookeeper服务器心跳时间,单位毫秒
initLimit=10 #Leader和Follower初始通信时限
syncLimit=5 #Leader和Follower同步通信时限
dataDir=/home/ambow/zkdata/data #自己指定zookeeper的data路径 如果没有时手动创建
dataLogDir=/home/ambow/zkdata/log #自己指定zookeeper的log路径 如果没有时手动创建
clientPort=2181 #zookeeper对外的端口
#指定server.1配zookeeper的各个节点 server.X=节点名:2888:3888
server.1=hadoopNode1:2888:3888
server.2=hadoopNode2:2888:3888
server.3=hadoopNode3:2888:3888
server.4=hadoopNode4:2888:3888
server.5=hadoopNode5:2888:3888
3.分发软件,及环境配置到各节点
[ambow@hadoopNode1 zkdata]$ scp -r ~/app/zookeeper-3.4.6 ambow@hadoopNode5:~/app/
[ambow@hadoopNode1 zkdata]$ scp -r ~/app/zookeeper-3.4.6 ambow@hadoopNode2:~/app/
[ambow@hadoopNode1 zkdata]$ scp -r ~/app/zookeeper-3.4.6 ambow@hadoopNode3:~/app/
[ambow@hadoopNode1 zkdata]$ scp -r ~/app/zookeeper-3.4.6 ambow@hadoopNode4:~/app/
[ambow@hadoopNode1 zkdata]$ scp -r ~/zkdata ambow@hadoopNode2:~/
[ambow@hadoopNode1 zkdata]$ scp -r ~/zkdata ambow@hadoopNode3:~/
[ambow@hadoopNode1 zkdata]$ scp -r ~/zkdata ambow@hadoopNode4:~/
[ambow@hadoopNode1 zkdata]$ scp -r ~/zkdata ambow@hadoopNode5:~/
[ambow@hadoopNode1 ~]$ scp ~/.bash_profile ambow@hadoopNode2:~
[ambow@hadoopNode1 ~]$ scp ~/.bash_profile ambow@hadoopNode3:~
[ambow@hadoopNode1 ~]$ scp ~/.bash_profile ambow@hadoopNode4:~
[ambow@hadoopNode1 ~]$ scp ~/.bash_profile ambow@hadoopNode5:~
[ambow@hadoopNode1 ~]$ source .bash_profile
[ambow@hadoopNode2 ~]$ source .bash_profile
[ambow@hadoopNode3 ~]$ source .bash_profile
[ambow@hadoopNode4 ~]$ source .bash_profile
[ambow@hadoopNode5 ~]$ source .bash_profile
注:每个节点的执行
4.增加myid
myid唯一服务标识
在指定的zk的data目录(dataDir=/home/ambow/zkdata/data)下创建一个myid的文件,每一个节点按照顺序分别添加内容:如:第一个节点添加1。第二人节点为2
server.1=master ———> echo 1 >> /home/ambow/zookeeper/myid
server.2=slave1: ———> echo 2 >> /home/ambow/zookeeper/myid
server.3=slave2: ———> echo 3 >> /home/ambow/zookeeper/myid
注:要与ZooKeeper配置文件zoo.cfg中配置的server.ID号一致
[ambow@hadoopNode1 data]$echo 1 >> /home/ambow/zookeeper/myid
[ambow@hadoopNode2 data]$echo 2 >> /home/ambow/zookeeper/myid
[ambow@hadoopNode3 data]$echo 3 >> /home/ambow/zookeeper/myid
[ambow@hadoopNode4 data]$echo 4 >> /home/ambow/zookeeper/myid
[ambow@hadoopNode5 data]$echo 5 >> /home/ambow/zookeeper/myid
[ambow@hadoopNode1 data]$ ssh hadoopNode5
[ambow@hadoopNode5 ~]$ cd zkdata/data/
[ambow@hadoopNode5 data]$ echo 5 >> myid
测试
各节点 启动 zkServer.sh start —启动服务
再用 zkSerer.sh status 查看leader,follower
注:启动集群zokeeper时,要把单节点的zookeeper产生的文件删除掉(/home/ambow/zookeeper这目录下,myid文件保留)kill掉leader 看是否选举
HDFS HA: 2 NN
1.配置core-site.xml
<configuration>
<!-- 集群中命名服务列表,名称自定义 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://clusterldg</value>
</property>
<!-- NameNode、DataNode、JournalNode等存放数据的公共目录。 会自动创建 不建议默认 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/home/wl/hdfs/data</value>
</property>
<!-- ZooKeeper集群的地址和端口。注意,数量一定是奇数,且不少于三个节点 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>master:2181,slave1:2181,slave2:2181</value>
</property>
</configuration>
2.hdfs-site.xml
<configuration>
<!-- 指定DataNode存储block的副本数量。默认值是3个 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!-- 给hdfs集群起名字,这个名字必须和core-site中的统一,且下面也会用到该名字 -->
<property>
<name>dfs.nameservices</name>
<value>clusterldg</value>
</property>
<!-- 指定NameService是cluster1时的namenode有哪些,这里的值也是逻辑名称,名字随便起,相互不重复即可 -->
<property>
<name>dfs.ha.namenodes.clusterldg</name>
<value>nn1,nn2</value>
</property>
<!-- 指定RPC地址 -->
<property>
<name>dfs.namenode.rpc-address.clusterldg.nn1</name>
<value>master:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.clusterldg.nn2</name>
<value>slave1:8020</value>
</property>
<!-- 指定http地址 -->
<property>
<name>dfs.namenode.http-address.clusterldg.nn1</name>
<value>master:50070</value>
</property>
<property>
<name>dfs.namenode.http-address.clusterldg.nn2</name>
<value>slave1:50070</value>
</property>
<!-- 指定cluster1是否启动自动故障恢复,即当NameNode出故障时,是否自动切换到另一台NameNode -->
<property>
<name>dfs.ha.automatic-failover.enabled.clusterldg</name>
<value>true</value>
</property>
<!-- 指定cluster1的两个NameNode共享edits文件目录时,使用的JournalNode集群信息 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://master:8485;slave1:8485;slave2:8485/clusterldg</value>
</property>
<!-- 指定cluster1出故障时,哪个实现类负责执行故障切换 -->
<property>
<name>dfs.client.failover.proxy.provider.clusterldg</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 指定JournalNode集群在对NameNode的目录进行共享时,自己存储数据的磁盘路径。tmp路径是自己创建,journal是启动journalnode自动生成 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/home/wl/zkdata/journal</value>
</property>
<!-- 一旦需要NameNode切换,使用ssh方式进行操作 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!-- 使用ssh进行故障切换,所以需要配置无密码登录,使用ssh通信时用的密钥存储的位置 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/wl/.ssh/id_rsa</value>
</property>
<property>
<!-- 设置权限验证 -->
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
3.配置slaves文件
hadoopNode1
hadoopNode2
hadoopNode3
hadoopNode4
hadoopNode5
4.配置文件分发至各节点
core-site.xml
hdfs-site.xml
slaves
5.zookeeper集群启动
各个节点执行: zkServer.sh start
注:如何启动失败,则把之前要把各个点的配置有dataDir=/home/ambow/zkdata/data目录删除数据,保留myid 文件
验证:leader,fllow
- 只在第一master个节点格式化格式化ZooKeeper
hdfs zkfc -formatZK
[ambow@hadoopNode1 data]$ hdfs zkfc -formatZK
7.启动 JournalNode集群
各节个节点 执行:hadoop-daemon.sh start journalnode
[ambow@hadoopNode1 data]$ hadoop-daemon.sh start journalnode
[ambow@hadoopNode2 data]$ hadoop-daemon.sh start journalnode
[ambow@hadoopNode3 data]$ hadoop-daemon.sh start journalnode
[ambow@hadoopNode4 data]$ hadoop-daemon.sh start journalnode
[ambow@hadoopNode5 data]$ hadoop-daemon.sh start journalnode
验证:各JournalNode的进程
8.格式化集群的第一个NN1节点(第一次启动需要,以后不需要再格式化了)
hdfs namenode -format
hadoop namenode -format
[ambow@hadoopNode1 data]$ hdfs namenode -format
注:格式化时,要把各个节点的hdfs目录删掉
9.在NN1节点上启动namenode
hadoop-daemon.sh start namenode
[ambow@hadoopNode1 data]$ hadoop-daemon.sh start namenode
验证:NameNode进程
10.同步NN1的元数据信息至NN2上 (第一次需要执行)
在NN2上执行 :hdfs namenode -bootstrapStandby
[ambow@hadoopNode2 data]$ hdfs namenode -bootstrapStandby
11.启动NN2的namenode进程
hadoop-daemon.sh start namenode
验证:NameNode进程
12.启动各节点的dataNode
hadoop-daemon.sh start datanode
13.启动 ZooKeeperFailoverCotroller
在NN1,NN2 节点上各执行:
hadoop-daemon.sh start zkfc
14.测试验证 HDFS HA
注意:如果无法完成自动切换需要查看日志文件,查找失败原因。CentOS7最小化版本中需要安装psmisc。
原因:提示未找到fuster程序,导致无法进行fence,所以可以通过如下命令来安装,Psmisc软件包中包含了fuster程序:
$>sudo yum install psmisc
YARN HA:
hadoopNode1 | hadoopNode2 | hadoopNode3 | hadoopNode4 | hadoopNode5 | |
---|---|---|---|---|---|
NameNode | NN1 | NN2 | |||
DataNode | DN | DN | DN | ||
ResourceManger | RM | RM | |||
NodeManger | NM | NM | NM | NM | NM |
ZooKeeper | ZK | ZK | ZK | ZK | ZK |
ZKFailoverController | ZKC | ZKC | |||
JournalNode | JN | JN | JN | JN | JN |
- 节点准备 3节点,5节点,10节点
1.配置yran-site.xml
<configuration>
<!-- 开启RM高可用 -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarn-cluster</value>
</property>
<!-- 指定RM的名字 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- 分别指定RM的地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>master</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>slave1</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>master:8088</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>slave1:8088</value>
</property>
<!-- 指定zk集群地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>master:2181,slave1:2181,slave2:2181</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定当前RM的ID 只添加在MR1 MR2节点上 可以不配置 -->
<property>
<name>yarn.resourcemanager.ha.id</name>
<value>rm1</value>
</property>
<!-- 启用RM重启功能 -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!-- 启用自动恢复,当任务进行一半,rm坏掉,就要启动自动恢复,默认是false -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!-- 指定resourcemanager的状态信息存储在zookeeper集群,默认是存放在FileSystem里面。 -->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
</configuration>
- 配置mapered-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
3.分发
4.测试
#启动RM1及所有NM
start-yarn.sh
#启动RM2
yarn-daemon.sh start resourcemanager
yarn rmadmin -getServiceState rm1
yarn rmadmin -getServiceState rm2
HBase
HBase是一个分布式的、面向列的开源数据库,
NOSQL的关系型数据库
存于HDFS文件系统
HMaster: HBase的主管理者,管理HRegionServer
HRegionServer:每台服务器的从管理者,主要负责响应用户的IO请求,向HDFS中读写数据
HRegion:一个HRegion保存一个表中一段连续的数据,一个表由多个HRegion组成
store: 存储数据, 分为两种:MemStore,StoreFile:,一个Store对应HBase表中的一个列族
MemStore: 最先存放数据地方,当数据满 时刷入StoreFile
StoreFile: 是HBase最小的存储单元,底层最终是以HFile形式存储在HDFS中
HFile:存HBase数据的地方,物理文件
HLong:日志,一个HDFS上的一个日志文件
HBase数据模型:
行关键字(row key):唯一标识表中的一行数据
列族:对表中的字段进行 分组
列关键字(Column Key): 列键 格式: “ 列族名:列名 ” 例 : “成绩:英语”
时间戳: 插入单元格时的是时间,以时间戳来区别该单元的版本号
存储单元(Cell): 行关键字+列关键字+时间戳 就能确认一个存储单元
HBase安装:
准备工作:
不同机器之间的时间同步 要求每个节点子在30秒
[root@hadoopNode5 ~]# yum -y install ntp #安装ntp软件
[root@hadoopNode5 ~]# ntpdate ntp1.aliyun.com //指定与啊里云时间同步服务
1)安装jdk(略)
2)安装hadoop(略)
3)下载hbase1.3.2(略)
4)tar包
[ambow@hadoopNode1 ~]$ tar -xvzf ~/soft/hbase-1.3.2-bin.tar.gz -C ~/app/
5)配置环境变量~/ .base_profile
HBASE_HOME
PATH
6)hbase-env.sh 配置java_home和ZK
[ambow@hadoopNode1 conf]$ vi $HBASE_HOME/conf/hbase-env.sh
export JAVA_HOME=/home/wl/app/jdk1.8.0_121
export HADOOP_HOME=/home/wl/app/hadoop-2.7.3
export HBASE_MANAGES_ZK=false #禁用Hbase使用内置zookeper
export HBASE_BACKUP_MASTERS=${HBASE_HOME}/conf/backup-masters #配置HA的第二个节HMaster节点
新建一个$HBASE_HOME/conf/backup-masters 文件
vi $HBASE_HOME/conf/backup-masters
把备用的 HMaster节点添加:
hadoopNode2
7)hbase-site.xml配置参数
<configuration>
<!-- #指定hbase在HDFS中目录 自动创建 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://clusterldg/hbase</value>
</property>
<!-- #true时,为集群模式u -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- #设置自己的zookeeper用的那个几个节点 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoopNode1,hadoopNode2,hadoopNode3,hadoopNode4,hadoopNode5</value>
</property>
<!-- #使用内置Zookeeper时要指定 -->
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/ambow/zkdata/hdata</value>
</property>
</configuration>
8)配置regionserver(配置每一个机器名 子节点名 不要配主节名)
在hbase/conf/下新建regionserver文件,添加如入内容
hadoopNode3
hadoopNode4
hadoopNode5
9)scp -r hbase到其他节点
[ambow@hadoopNode1 conf]$ scp -r ~/app/hbase-1.3.2 ambow@hadoopNode5:~/app/
[ambow@hadoopNode1 conf]$ scp -r ~/app/hbase-1.3.2 ambow@hadoopNode4:~/app/
[ambow@hadoopNode1 conf]$ scp -r ~/app/hbase-1.3.2 ambow@hadoopNode3:~/app/
[ambow@hadoopNode1 conf]$ scp -r ~/app/hbase-1.3.2 ambow@hadoopNode2:~/app/
[ambow@hadoopNode1 conf]$ scp ~/.bash_profile ambow@hadoopNode5:~
[ambow@hadoopNode1 conf]$ scp ~/.bash_profile ambow@hadoopNode4:~
[ambow@hadoopNode1 conf]$ scp ~/.bash_profile ambow@hadoopNode3:~
[ambow@hadoopNode1 conf]$ scp ~/.bash_profile ambow@hadoopNode2:~
各节点重新加载 :source ~/.bash_profile
10)启动hdfs start-dfs.sh
11)启动各个节点的 ZK zkServer.sh start
#zkServer.sh start
12)启动hbase
# start-hbase.sh
#stop-hbash.sh
13验证: