01_尚硅谷大数据技术之大数据概论.pdf
02_尚硅谷大数据技术之Hadoop(入门)V3.3.pdf
03_尚硅谷大数据技术之Hadoop(HDFS)V3.3.pdf
04_尚硅谷大数据技术之Hadoop(MapReduce)V3.3.pdf
05_尚硅谷大数据技术之Hadoop(Yarn)V3.3.pdf
06_尚硅谷大数据技术之Hadoop(生产调优手册)V3.3.pdf
07_尚硅谷大数据技术之Hadoop(源码解析)V3.3.pdf
随堂绘图.pdf
Hadoop运行环境搭建
- 配置CentOS-7.5-x86-1804虚拟机,配置为静态IP地址为:192.168.10.100、主机名称hadoop100
- 测试能否正常上网
ping www.baidu.com - 安装epel-release,
yum install -y epel-release - 关闭防火墙
systemctl stop firewalld,systemctl disable firewalld.service 创建atguigu用户,并修改atguigu用户的密码useradd atguigu,passwd atguigu`- 卸载虚拟机自带的JDK,
rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps,rpm-qa:查询所安装的所有rpm软件包,grep-i:忽略大小写,xargs -n1:表示每次只传递一个参数,rpm -e –nodeps:强制卸载软件 - 重启虚拟机
- 克隆三台虚拟机为hadoop102、hadoop103、hadoop104
修改克隆机的IP
vim /etc/sysconfig/network-scripts/ifcfg-ens33添加BOOTPROTO=staticIPADDR=192.168.10.102GATEWAY=192.168.10.2DNS1=192.168.10.2source /etc/profile
在hadoop102上安装JDK和Hadoop
将压缩包导入并解压
- 配置环境变量
vim /etc/profile.d/my_env.sh```java
JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212 export PATH=$PATH:$JAVA_HOME/bin
HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3 export PATH=$PATH:$HADOOP_HOME/bin export PATH=$PATH:$HADOOP_HOME/sbin
3. 测试JDk是否安装成功`java -version`3. 测试hadoop是否安装成功`hadoop version`3. 重启`reboot`<a name="qoTw0"></a># Hadoop 运行模式1)Hadoop 官方网站:http://hadoop.apache.org/ <br />2)Hadoop 运行模式包括:**本地模式**、**伪分布式模式**以及**完全分布式模式**。- **本地模式**:单机运行,只是用来演示一下官方案例。生产环境不用。- **伪分布式模式:**也是单机运行,但是具备 Hadoop 集群的所有功能,一台服务器模拟一个分布式的环境。个别缺钱的公司用来测试,生产环境不用。- **完全分布式模式:**多台服务器组成分布式环境。生产环境使用。<a name="Rd1Ak"></a>## 本地运行模式1. 创建在 hadoop-3.1.3 文件下面创建一个 wcinput 文件夹```java1)创建在 hadoop-3.1.3 文件下面创建一个 wcinput 文件夹[atguigu@hadoop102 hadoop-3.1.3]$ mkdir wcinput2)在 wcinput 文件下创建一个 word.txt 文件[atguigu@hadoop102 hadoop-3.1.3]$ cd wcinput3)编辑 word.txt 文件[atguigu@hadoop102 wcinput]$ vim word.txt➢ 在文件中输入如下内容hadoop yarnhadoop mapreduceatguiguatguigu➢ 保存退出::wq4)回到 Hadoop 目录/opt/module/hadoop-3.1.35)执行程序[atguigu@hadoop102 hadoop-3.1.3]$ hadoop jarshare/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jarwordcount wcinput wcoutput6)查看结果[atguigu@hadoop102 hadoop-3.1.3]$ cat wcoutput/part-r-00000看到如下结果:atguigu 2hadoop 2mapreduce 1yarn 1
完全分布式运行模式
拷贝JDK和hadoop在其他两台服务器上
scp -r $pdir/$fname $user@$host:$pdir/$fname 命令 递归 要拷贝的文件路径/名称 目的地用户@主机:目的地路径/名称 在hadoop2上 scp -r /opt/module/* atguigu@hadoop103:/opt/module/ scp -r /opt/module/* atguigu@hadoop104:/opt/module/rsync远程同步工具
rsync 主要用于备份和镜像。具有速度快、避免复制相同内容和支持符号链接的优点。 rsync 和 scp 区别:用 rsync 做文件的复制要比 scp 的速度快,rsync 只对差异文件做更新。scp 是把所有文件都复制过去。
- 基本语法:
rsync -av $pdir/$fname $user@$host:$pdir/$fname- -a:归档拷贝
- -v:显示复制过程
- 例如:
rsync -av hadoop-3.1.3/atguigu@hadoop103:/opt/module/hadoop-3.1.3/编写xsync集群分发脚本
(1)需求:循环复制文件到所有节点的相同目录下
(2)需求分析:
(a)rsync 命令原始拷贝:rsync -av /opt/module atguigu@hadoop103:/opt/
(b)期望脚本: xsync 要同步的文件名称
(c)期望脚本在任何路径都能使用(脚本放在声明了全局环境变量的路径) ```shell 1.添加环境变量 vim /etc/profile.d/my_env.sh export PATH:$PATH:/home/lhd/bin
- 编写脚本
cd /home/lhd/
mkdir bin
cd bin
vim xsync
#
!/bin/bash
1. 判断参数个数
if [ $# -lt 1 ] then echo Not Enough Arguement! exit; fi2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104 do echo ==================== $host ====================3. 遍历所有目录,挨个发送
for file in $@ do4. 判断文件是否存在
if [ -e $file ] then5. 获取父目录
pdir=$(cd -P $(dirname $file); pwd)6. 获取当前文件的名称
fname=$(basename $file) ssh $host “mkdir -p $pdir” rsync -av $pdir/$fname $host:$pdir else echo $file does not exists! fi done done#
- 编写脚本xsync具有执行权限 chmod +x xsync
- 测试脚本
xsync /home/lhd/bin
5.将脚本复制到bin目录下,以便全局调用
sudo cp xsync /bin/
6.同步环境变量配置
sudo /bin/xsync
7.让环境变量生效
[atguigu@hadoop103 bin]$ source /etc/profile
[atguigu@hadoop104 opt]$ source /etc/profile
```
SSH无密登陆配置
1)配置 ssh
(1)基本语法 : ssh 另一台电脑的 IP 地址
(2)ssh 连接时出现 Host key verification failed 的解决方法
[atguigu@hadoop102 ~]$ ssh hadoop103
➢ 如果出现如下内容
Are you sure you want to continue connecting (yes/no)?
➢ 输入 yes,并回车
(3)退回到 hadoop102[atguigu@hadoop103 ~]$ exit
2)无密钥配置 ```shell cd ~ cd .ssh //如果没有这个文件夹则运行一下 ssh Hadoop102 ssh-keygen -t rsa //生成公钥和私钥 ssh-copy-id hadoop102 ssh-copy-id hadoop103 ssh-copy-id hadoop104
注意: 还需要在 hadoop103 上采用 atguigu 账号配置一下无密登录到 hadoop102、hadoop103、 hadoop104 服务器上。 还需要在 hadoop104 上采用 atguigu 账号配置一下无密登录到 hadoop102、hadoop103、 hadoop104 服务器上。 还需要在 hadoop102 上采用 root 账号,配置一下无密登录到 hadoop102、hadoop103、 hadoop104
<a name="mEORp"></a>
## 集群配置
<a name="zHVdu"></a>
### 集群部署规划
- NameNode 和 SecondaryNameNode 不要安装在同一台服务器
- ResourceManager 也很消耗内存,不要和 NameNode、SecondaryNameNode 配置在同一台机器上。

<a name="bU9YG"></a>
### 配置文件说明
Hadoop 配置文件分两类:默认配置文件和自定义配置文件,只有用户想修改某一默认配置值时,才需要修改自定义配置文件,更改相应属性值。<br />Hadoop 配置文件分两类:默认配置文件和自定义配置文件,只有用户想修改某一默认 <br />配置值时,才需要修改自定义配置文件,更改相应属性值。 <br />(1)默认配置文件: <br /><br />(2)自定义配置文件: <br />**core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml **四个配置文件存放在 <br />$HADOOP_HOME/etc/hadoop 这个路径上,用户可以根据项目需求重新进行修改配置。
<a name="e6e6z"></a>
### 配置集群
```shell
1. 核心配置文件 配置core-site.xml
[atguigu@hadoop102 ~]$ cd $HADOOP_HOME/etc/hadoop
[atguigu@hadoop102 hadoop]$ vim core-site.xml
文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定 NameNode 的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop102:8020</value>
</property>
<!-- 指定 hadoop 数据的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.1.3/data</value>
</property>
<!-- 配置 HDFS 网页登录使用的静态用户为 atguigu -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>atguigu</value>
</property>
</configuration>
2. HDFS 配置文件
配置 hdfs-site.xml
[atguigu@hadoop102 hadoop]$ vim hdfs-site.xml
文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- nn web 端访问地址-->
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop102:9870</value>
</property>
<!-- 2nn web 端访问地址-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop104:9868</value>
</property>
</configuration>
3.YARN 配置文件 配置 yarn-site.xml
[atguigu@hadoop102 hadoop]$ vim yarn-site.xml
文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定 MR 走 shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定 ResourceManager 的地址-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop103</value>
</property>
<!-- 环境变量的继承 -->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>
4.MapReduce 配置文件 配置 mapred-site.xml
[atguigu@hadoop102 hadoop]$ vim mapred-site.xml
文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定 MapReduce 程序运行在 Yarn 上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
4)在集群上分发配置好的 Hadoop 配置文件
[atguigu@hadoop102 hadoop]$ xsync /opt/module/hadoop-
3.1.3/etc/hadoop/
5)去 103 和 104 上查看文件分发情况
[atguigu@hadoop103 ~]$ cat /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
[atguigu@hadoop104 ~]$ cat /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
群起集群
1)配置 workers
[atguigu@hadoop102 hadoop]$ vim /opt/module/hadoop-3.1.3/etc/hadoop/workers
在该文件中增加如下内容:
hadoop102
hadoop103
hadoop104
注意:该文件中添加的内容结尾不允许有空格,文件中不允许有空行。
同步所有节点配置文件
[atguigu@hadoop102 hadoop]$ xsync /opt/module/hadoop-3.1.3/etc
2)启动集群
(1)如果集群是第一次启动,需要在 hadoop102 节点格式化 NameNode(注意:格式化 NameNode,会产生新的集群 id,导致 NameNode 和 DataNode 的集群 id 不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化 NameNode 的话,一定要先停 止 namenode 和 datanode 进程,并且要删除所有机器的 data 和 logs 目录,然后再进行格式化。)
[atguigu@hadoop102 hadoop-3.1.3]$ hdfs namenode -format
(2)启动 HDFS
[atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
(3)在配置了 ResourceManager 的节点(hadoop103)启动 YARN
[atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
(4)Web 端查看 HDFS 的 NameNode
(a)浏览器中输入:http://hadoop102:9870
(b)查看 HDFS 上存储的数据信息
(5)Web 端查看 YARN 的 ResourceManager
(a)浏览器中输入:http://hadoop103:8088
(b)查看 YARN 上运行的 Job 信息
3)集群基本测试
(1)上传文件到集群
➢ 上传小文件
[atguigu@hadoop102 ~]$ hadoop fs -mkdir /input
[atguigu@hadoop102 ~]$ hadoop fs -put $HADOOP_HOME/wcinput/word.txt /input
➢ 上传大文件
[atguigu@hadoop102 ~]$ hadoop fs -put /opt/software/jdk-8u212-linux-x64.tar.gz / (2)上传文件后查看文件存放在什么位置
➢ 查看 HDFS 文件存储路径
[atguigu@hadoop102 subdir0]$ pwd
/opt/module/hadoop-3.1.3/data/dfs/data/current/BP-1436128598-192.168.10.102-1610603650062/current/finalized/subdir0/subdir0
➢ 查看 HDFS 在磁盘存储文件内容
[atguigu@hadoop102 subdir0]$ cat blk_1073741825
hadoop yarn
hadoop mapreduce
atguigu
atguigu
(3)拼接
-rw-rw-r--. 1 atguigu atguigu 134217728 5 月 23 16:01 blk_1073741836
-rw-rw-r--. 1 atguigu atguigu 1048583 5 月 23 16:01 blk_1073741836_1012.meta
-rw-rw-r--. 1 atguigu atguigu 63439959 5 月 23 16:01 blk_1073741837
-rw-rw-r--. 1 atguigu atguigu 495635 5 月 23 16:01 blk_1073741837_1013.meta
[atguigu@hadoop102 subdir0]$ cat blk_1073741836>>tmp.tar.gz
[atguigu@hadoop102 subdir0]$ cat blk_1073741837>>tmp.tar.gz
[atguigu@hadoop102 subdir0]$ tar -zxvf tmp.tar.gz
(4)下载
[atguigu@hadoop104 software]$ hadoop fs -get /jdk-8u212-linuxx64.tar.gz ./
(5)执行 wordcount 程序
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output
集群崩溃处理方法
1.删除每台服务器的data目录和logs目录
cd /opt/module/hadoop-3.1.3
rm -rf data
rm -rf logs
2.重新进行初始化
[atguigu@hadoop102 hadoop-3.1.3]$ hdfs namenode -format
(2)启动 HDFS
[atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
(3)在配置了 ResourceManager 的节点(hadoop103)启动 YARN
[atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
配置历史服务器
为了查看程序的历史运行情况,需要配置一下历史服务器。具体配置步骤如下:
1)配置 mapred-site.xml
[atguigu@hadoop102 hadoop]$ vim mapred-site.xml
在该文件里面增加如下配置。
<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop102:10020</value>
</property>
<!-- 历史服务器 web 端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop102:19888</value>
</property>
2)分发配置
[atguigu@hadoop102 hadoop]$ xsync $HADOOP_HOME/etc/hadoop/mapred-site.xml
3)在 hadoop102 启动历史服务器
[atguigu@hadoop102 hadoop]$ mapred --daemon start historyserver
4)查看历史服务器是否启动
[atguigu@hadoop102 hadoop]$ jps
5)查看 JobHistory http://hadoop102:19888/jobhistory
配置日志的聚集
1)配置 yarn-site.xml
[atguigu@hadoop102 hadoop]$ vim yarn-site.xml
在该文件里面增加如下配置。
<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 设置日志保留时间为 7 天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
2)分发配置
[atguigu@hadoop102 hadoop]$ xsync $HADOOP_HOME/etc/hadoop/yarn-site.xml
3)关闭 NodeManager 、ResourceManager 和 HistoryServer
[atguigu@hadoop103 hadoop-3.1.3]$ sbin/stop-yarn.sh
[atguigu@hadoop103 hadoop-3.1.3]$ mapred --daemon stop
historyserver
4)启动 NodeManager 、ResourceManage 和 HistoryServer
[atguigu@hadoop103 ~]$ start-yarn.sh
[atguigu@hadoop102 ~]$ mapred --daemon start historyserver
5)删除 HDFS 上已经存在的输出文件
[atguigu@hadoop102 ~]$ hadoop fs -rm -r /output
6)执行 WordCount 程序
[atguigu@hadoop102 hadoop-3.1.3]$ hadoop jar
share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar
wordcount /input /output
7)查看日志
(1)历史服务器地址
http://hadoop102:19888/jobhistory
集群启动/停止方式
1)各个模块分开启动/停止(配置 ssh 是前提)常用
(1)整体启动/停止 HDFS
start-dfs.sh/stop-dfs.sh
(2)整体启动/停止 YARN
start-yarn.sh/stop-yarn.sh
2)各个服务组件逐一启动/停止
(1)分别启动/停止 HDFS 组件
hdfs --daemon start/stop namenode/datanode/secondarynamenode
(2)启动/停止 YARN
yarn --daemon start/stop resourcemanager/nodemanager
=========编写 Hadoop 集群常用脚本=================
1)Hadoop 集群启停脚本(包含 HDFS,Yarn,Historyserver):myhadoop.sh
[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 bin]$ vim myhadoop.sh
➢ 输入如下内容
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo " =================== 启动 hadoop 集群 ==================="
echo " --------------- 启动 hdfs ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start
historyserver"
;;
"stop")
echo " =================== 关闭 hadoop 集群 ==================="
echo " --------------- 关闭 historyserver ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop
historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac
➢ 保存后退出,然后赋予脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x myhadoop.sh
2)查看三台服务器 Java 进程脚本:jpsall
[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 bin]$ vim jpsall
➢ 输入如下内容
#!/bin/bash
for host in hadoop102 hadoop103 hadoop104
do
echo =============== $host ===============
ssh $host jps
done
➢ 保存后退出,然后赋予脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x jpsall
3)分发/home/atguigu/bin 目录,保证自定义脚本在三台机器上都可以使用
[atguigu@hadoop102 ~]$ xsync /home/atguigu/bin/
HDFS
HDFS的Shell操作
基本语法
hadoop fs 具体命令或者hdfs dfs 具体命令
| 指令 | 功能 |
|---|---|
| hadoop fs -moveFromLocal sourceFile destFile | 将sorceFile文件剪切到HDFS的destFile下 |
| hadoop fs -copyFromLocal source dest | 将source复制到dest |
| hadoop fs -put source dest | 将source复制到dest |
| hadoop fs -appendToFile source dest | 将source文件追加到dest文件中 |
| hadoop fs -copyToLocal source dest | 从HDFS拷贝到本地 |
| hadoop fs -get source dest | 从HDFS拷贝到本地 |
| hadoop fs -ls /sanguo | 显示目录信息 |
| hadoop fs -cat /sanguo/shuguo.txt | 显示文件内容 |
| hadoop fs -tail /jinguo/shuguo.txt | 显示一个文件末尾1kb的数据 |
| hadoop fs -rm /sanguo/shuguo.txt | 删除文件或文件夹 |
| hadoop fs -rm -r /sanguo | 递归删除目录及目录里面的内容 |
| hadoop fs -du -s -h /jinguo | 统计文件夹的大小信息 -s显示文件夹信息 删除-s会显示具体文件信息 |
| -setrep | 设置副本数量 |
HDFS API操作
参数优先级:hdfs-default.xml-->hdfs-site.xml-->资源目录下的配置文件-->代码里面的配置
package com.atguigu.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.checkerframework.checker.units.qual.A;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
/**
* 客户端代码常用套路
* 1. 获取一个客户端对象
* 2. 执行相关的操作命令
* 3. 关闭资源
*/
public class HdfsClient {
private FileSystem fs;
@Before
public void init() throws Exception{
//连接集群的nn地址
URI uri = new URI("hdfs://hadoop102:8020");
//创建一个配置文件
Configuration configuration = new Configuration();
//用户
String user="lhd";
//获取客户端对象
fs = FileSystem.get(uri, configuration,user);
}
@After
public void close() throws IOException {
fs.close();
}
@Test //创建文件夹
public void testmkdir() throws Exception {
//创建一个文件夹
fs.mkdirs(new Path("/xiyou/huangguoshan1"));
//关闭资源
fs.close();
}
@Test //上传
public void testPut() throws Exception{
//参数解读:参数1:表示删除原数据。参数二:是否允许覆盖,参数三:原数据路径,参数四:目的地路径
fs.copyFromLocalFile(false,false,new Path("./data/sunwukong.txt"),new Path("/xiyou/huaguoshang"));
}
@Test //文件下载
public void testGet() throws IOException {
//参数解读,参数1:源文件是否删除,参数2:源文件路径HDFS,参数3:目标路径Win,参数4:是否开启校验
fs.copyToLocalFile(false,new Path("/xiyou/huaguoshang"),new Path("./data/huaguoshang"),true);
}
@Test //删除
public void testPm() throws IOException {
//参数解读:参数1:要删除的路径,参数2:是否递归删除
fs.delete(new Path("/xiyou"),true);
}
@Test //文件移动
public void testmv() throws IOException {
//参数解读:参数1:源文件路径,参数2:目标文件路径
//对文件名称修改
fs.rename(new Path("/input/work.txt"),new Path("/input/ss.txt"));
//文件移动和更名
fs.rename(new Path("/input/ss.txt"),new Path("/cls.txt"));
//目录更名
fs.rename(new Path("/input"),new Path("/input1"));
}
@Test //获取文件详细信息
public void fileDetail() throws IOException {
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);//返回一个迭代器
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println("==============="+fileStatus.getPath()+"================");
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getOwner());
System.out.println(fileStatus.getGroup());
System.out.println(fileStatus.getLen());
System.out.println(fileStatus.getModificationTime());
System.out.println(fileStatus.getReplication());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPath().getName());
//获取块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println(Arrays.toString(blockLocations));
}
}
@Test //判断是文件夹还是文件
public void testFile() throws IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
for (FileStatus status : listStatus) {
if (status.isFile()) {
System.out.println("这是一个文件"+status.getPath().getName());
}else {
System.out.println("这是一个目录"+status.getPath().getName());
}
}
}
}
MapReduce
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
MappReduce优缺点
优点
- MapReduce易于编程:它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
- 良好的扩展性:当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
- 高容错性:MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
适合PB级以上海量数据的离线处理 : 可以实现上千台服务器集群并发工作,提供数据处理能力。
缺点
不擅长实时计算 : MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
- 不擅长流式计算 : 流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
- 不擅长DAG(有向无环图)计算 : 多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
常用数据序列化类型
| Java类型 | Hadoop Writable类型 |
|---|---|
| Boolean | BooleanWritable |
| Byte | ByteWritable |
| Int | IntWritable |
| Float | FloatWritable |
| Long | LongWritable |
| Double | DoubleWritable |
| String | Text |
| Map | MapWritable |
| Array | ArrayWritable |
| Null | NullWritable |
MapReduce编程规范
用户编写的程序分成三个部分:Mapper、Reducer、Driver
- Mapper阶段
- 用户自定义的Mapper要继承自己的父类
- Mapper的输入数据是KV对的形式
- Mapper中的业务逻辑写在map()方法中
- Mapper的输出数据是KV对的形式(KV的类型可自定义)
- map()方法(MapTask进程对每一个
调用一次)
- Reduce阶段
- 用户自定义的Reduce要继承自己的父类
- Reduce的输入数据类型对应Mapper的输出数据类型,也是KV
- Reduce的业务逻辑写在reduce()方法中
- ReduceTask进程对每一组相同的
组调用一次reduce()方法
Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象
WordCount案例实操
创建maven工程
添加依赖 ```java <?xml version=”1.0” encoding=”UTF-8”?>4.0.0 com.atguigu HDFSClient 1.0-SNAPSHOT 8 8 org.apache.hadoop hadoop-client 3.1.3 junit junit 4.12 org.slf4j slf4j-log4j12 1.7.30 maven-compiler-plugin 3.6.1 1.8 1.8
**编写日志文件**
```shell
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
编写Mapper类
package com.atguigu.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* <KEYIN, map阶段输入的key的类型:LongWritable
* VALUEIN,map阶段输入value类型:Test
* KEYOUT, map阶段属于出的key类型:Test
* VALUEOUT>,map阶段输出的value类型IntWritable
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
private Text outKey = new Text();
private IntWritable outValue=new IntWritable(1);
@Override //传进输入的key和value
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 获取一行
String line = value.toString();
//2.对字符串进行切割
String[] words = line.split(" ");
//3.循环写出
for (String word : words) {
//封装outKey
outKey.set(word);
//写出
context.write(outKey,outValue);
}
}
}
编写Reduce类
package com.atguigu.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* <KEYIN, reduce阶段输入的key的类型:LongWritable
* VALUEIN,reduce阶段输入value类型:Test
* KEYOUT, reduce阶段属于出的key类型:Test
* VALUEOUT>,reduce阶段输出的value类型IntWritable
*
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum+=value.get();
}
outV.set(sum);
//写出
context.write(key,outV);
}
}
编写Driver类
package com.atguigu.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3.关联mapper和reduce
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
本地测试
- 需要首先配置好HADOOP_HOME变量以及Windows运行依赖
- 在IDEA上运行程序
集群测试
将文件打包为jar包,启动集群,运行jar包hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/input /user/atguigu/output
序列化
- 为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
- Hadoop序列化特点:
(1)紧凑:高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互
package com.atguigu.mapreduce.writable;
import lombok.Data;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 1.定义类实现writable
* 2.重写序列化和反序列化方法
* 3.重写空构造
* 4.toString方法
*/
//1 继承Writable接口
public class FlowBean implements Writable {
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
//2 提供无参构造
public FlowBean() {
}
//3 提供三个参数的getter和setter方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//4 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//5 重写ToString
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 获取一行数据,转成字符串
String line = value.toString();
//2 切割数据
String[] split = line.split("\t");
//3 抓取我们需要的数据:手机号,上行流量,下行流量
String phone = split[1];
String up = split[split.length - 3];
String down = split[split.length - 2];
//4 封装outK outV
outK.set(phone);
outV.setUpFlow(Long.parseLong(up));
outV.setDownFlow(Long.parseLong(down));
outV.setSumFlow();
//5 写出outK outV
context.write(outK, outV);
}
}
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long totalUp = 0;
long totalDown = 0;
//1 遍历values,将其中的上行流量,下行流量分别累加
for (FlowBean flowBean : values) {
totalUp += flowBean.getUpFlow();
totalDown += flowBean.getDownFlow();
}
//2 封装outKV
outV.setUpFlow(totalUp);
outV.setDownFlow(totalDown);
outV.setSumFlow();
//3 写出outK outV
context.write(key,outV);
}
}
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联本Driver类
job.setJarByClass(FlowDriver.class);
//3 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6 设置程序的输入输出路径
FileInputFormat.setInputPaths(job, new Path("./data/phone_data.txt"));
FileOutputFormat.setOutputPath(job, new Path("./data/output11"));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
切片机制
- FileInputFormat切片机制,TextInputFormat是默认的FileInputFormat实现类
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- CombineTextInputFormat切片机制,框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下
- CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
// 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //切换切片机制 CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); //虚拟存储切片最大值设置为4M分区
要求将统计结果按照条件输出到不同的文件(分区)中,默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
自定义Partitioner步骤
- CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
- 自定义类继承Patitioner,重写getPartitioner()方法
- 在Job驱动中,设置自定义Partitioner
- 自定义Partition后,要根据自定义Partiton的逻辑设置相应数量的ReduceTask
分区总结
- 如果ReduceTask的数量 > getPartiton的结果数,则会多产生几个空的输出文件
- 如果ReduceTask的数量 < getPartiton的结果数,则有一部分分区数据无处安放,会抛出异常
- 如果ReduceTask的数量=1,则不管MapTask端有多少个分区文件,最终都会交给这一个ReduceTask,最终也就只会产生一个结果文件
- 分区号必须从零开始,逐一累加 ```java package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;
/**
- Partitioner
的Key和Value为map处理后输出的类型 */ public class CustomPartitoner extends Partitioner
{ @Override public int getPartition(Text text, FlowBean flowBean, int i) { String phone = text.toString(); String prePhone = phone.substring(0, 3); //定义一个分区号变量partition,根据prePhone设置分区号 int partition; if(“136”.equals(prePhone)){
partition = 0;}else if(“137”.equals(prePhone)){
partition = 1;}else if(“138”.equals(prePhone)){
partition = 2;}else if(“139”.equals(prePhone)){
partition = 3;}else {
partition = 4;}
//最后返回分区号partition return partition; } }
```java
package com.atguigu.mapreduce.writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联本Driver类
job.setJarByClass(FlowDriver.class);
//3 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置Map端输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定自定义分区器
job.setPartitionerClass(CustomPartitoner.class);
job.setNumReduceTasks(5);
//6 设置程序的输入输出路径
FileInputFormat.setInputPaths(job, new Path("./data/phone_data.txt"));
FileOutputFormat.setOutputPath(job, new Path("./data/output5"));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
排序
排序是MapReduce框架中最重要的操作之—。MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
排序分类
- 部分排序:MapReduce根据输入记录的键对数据库排序。保证输出的每个文件内部有序。
- 全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是之设置一个ReduceTask,但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduece所提供的并行架构
- 辅助排序:GroupingComparator分组:在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序
- 二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个既为二次排序
全排序
将序列化后产生的结果作为输入进行排序
- 编写FlowBean ```java package com.atguigu.mapreduce.writablecompable;
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class FlowBean implements WritableComparable
private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量
//提供无参构造
public FlowBean() {
}
//生成三个属性的getter和setter方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//实现序列化和反序列化方法,注意顺序一定要一致
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.upFlow);
out.writeLong(this.downFlow);
out.writeLong(this.sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
//重写ToString,最后要输出FlowBean
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
@Override
public int compareTo(FlowBean o) {
//按照总流量比较,倒序排列
if(this.sumFlow > o.sumFlow){
return -1;
}else if(this.sumFlow < o.sumFlow){
return 1;
}else {
if(this.upFlow > o.upFlow){
return -1;
}else if(this.upFlow < o.upFlow){
return 1;
}else {
return 0;
}
}
}
}
2. **编写Mapeer类**
```java
package com.atguigu.mapreduce.writablecompable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private FlowBean outK = new FlowBean();
private Text outV = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1 获取一行数据
String line = value.toString();
//2 按照"\t",切割数据
String[] split = line.split("\t");
//3 封装outK outV
outK.setUpFlow(Long.parseLong(split[1]));
outK.setDownFlow(Long.parseLong(split[2]));
outK.setSumFlow();
outV.set(split[0]);
//4 写出outK outV
context.write(outK,outV);
}
}
- 编写Reduce类 ```java package com.atguigu.mapreduce.writablecompable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException;
public class FlowReducer extends Reducer
//遍历values集合,循环写出,避免总流量相同的情况
for (Text value : values) {
//调换KV位置,反向写出
context.write(value,key);
}
}
}
4. **编写Driver类**
```java
package com.atguigu.mapreduce.writablecompable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联本Driver类
job.setJarByClass(FlowDriver.class);
//3 关联Mapper和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4 设置Map端输出数据的KV类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5 设置程序最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\inputflow2"));
FileOutputFormat.setOutputPath(job, new Path("D:\\comparout"));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
区间排序
要求:按照每个省份手机号输出的文件中按照总流量内部排序。基于前一个需求,增加自定义分区类,分区按照省份手机号设置。
- 添加自定义分区类 ```java package com.atguigu.mapreduce.partitionercompable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner2 extends Partitioner
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
//获取手机号前三位
String phone = text.toString();
String prePhone = phone.substring(0, 3);
//定义一个分区号变量partition,根据prePhone设置分区号
int partition;
if("136".equals(prePhone)){
partition = 0;
}else if("137".equals(prePhone)){
partition = 1;
}else if("138".equals(prePhone)){
partition = 2;
}else if("139".equals(prePhone)){
partition = 3;
}else {
partition = 4;
}
//最后返回分区号partition
return partition;
}
}
2. 在驱动类中添加分区
```java
// 设置自定义分区器
job.setPartitionerClass(ProvincePartitioner2.class);
// 设置对应的ReduceTask的个数
job.setNumReduceTasks(5);
Combiner合并
- Combiner是MP程序中Mapper和Reduce之外的一种组件
- Combiner组件的父类就是Reduce
- Combiner和Reduce的区别在于运行位置
- Combiner是在每一个MapTask所在的节点运行
- Reducec是接收全局所有Mapper的输出结果
- Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量
- Combiner能够应用的前提是不能影响最终的业务逻辑,而却Combiner的输出kv应该和Reduce的输入kv类型要对应起来
自定义Combiner实现步骤
- 自定义一个Combiner继承Reduce,重写Reduce方法
- 在Job驱动类中设置
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outV.set(sum); context.write(key,outV); } } (b)在Job驱动类中设置: job.setCombinerClass(WordCountCombiner.class);输出控制OutPutFormat
OutputFormat的MapReduce输出的基类,所有实现MapReduece输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。默认输出格式TextOutputFormat
实现步骤:(过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log)
创建一个LogReduceWrite继承ReduceWrite
- 创建两个文件输出流:atguiguOut、otherOut
- 如果输入数据包含atguigu输出到atguigu流
(1)编写LogMapper类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//不做任何处理,直接写出一行log数据
context.write(value,NullWritable.get());
}
}
(2)编写LogReducer类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class LogReducer extends Reducer<Text, NullWritable,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// 防止有相同的数据,迭代写出
for (NullWritable value : values) {
context.write(key,NullWritable.get());
}
}
}
(3)自定义一个LogOutputFormat类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
//创建一个自定义的RecordWriter返回
LogRecordWriter logRecordWriter = new LogRecordWriter(job);
return logRecordWriter;
}
}
(4)编写LogRecordWriter类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream atguiguOut;
private FSDataOutputStream otherOut;
public LogRecordWriter(TaskAttemptContext job) {
try {
//获取文件系统对象
FileSystem fs = FileSystem.get(job.getConfiguration());
//用文件系统对象创建两个输出流对应不同的目录
atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log"));
otherOut = fs.create(new Path("d:/hadoop/other.log"));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String log = key.toString();
//根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
if (log.contains("atguigu")) {
atguiguOut.writeBytes(log + "\n");
} else {
otherOut.writeBytes(log + "\n");
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
//关流
IOUtils.closeStream(atguiguOut);
IOUtils.closeStream(otherOut);
}
}
(5)编写LogDriver类
package com.atguigu.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置自定义的outputformat
job.setOutputFormatClass(LogOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("D:\\input"));
//虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
//而fileoutputformat要输出一个_SUCCESS文件,所以在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
Reduce Join

(1)创建商品和订单合并后的TableBean类
package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class TableBean implements Writable {
private String id; //订单id
private String pid; //产品id
private int amount; //产品数量
private String pname; //产品名称
private String flag; //判断是order表还是pd表的标志字段
public TableBean() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public String toString() {
return id + "\t" + pname + "\t" + amount;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
}
(2)编写TableMapper类
package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> {
private String filename;
private Text outK = new Text();
private TableBean outV = new TableBean();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取对应文件名称
InputSplit split = context.getInputSplit();
FileSplit fileSplit = (FileSplit) split;
filename = fileSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行
String line = value.toString();
//判断是哪个文件,然后针对文件进行不同的操作
if(filename.contains("order")){ //订单表的处理
String[] split = line.split("\t");
//封装outK
outK.set(split[1]);
//封装outV
outV.setId(split[0]);
outV.setPid(split[1]);
outV.setAmount(Integer.parseInt(split[2]));
outV.setPname("");
outV.setFlag("order");
}else { //商品表的处理
String[] split = line.split("\t");
//封装outK
outK.set(split[0]);
//封装outV
outV.setId("");
outV.setPid(split[0]);
outV.setAmount(0);
outV.setPname(split[1]);
outV.setFlag("pd");
}
//写出KV
context.write(outK,outV);
}
}
(3)编写TableReducer类
package com.atguigu.mapreduce.reducejoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class TableReducer extends Reducer<Text,TableBean,TableBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<TableBean> values, Context context) throws IOException, InterruptedException {
ArrayList<TableBean> orderBeans = new ArrayList<>();
TableBean pdBean = new TableBean();
for (TableBean value : values) {
//判断数据来自哪个表
if("order".equals(value.getFlag())){ //订单表
//创建一个临时TableBean对象接收value
TableBean tmpOrderBean = new TableBean();
try {
BeanUtils.copyProperties(tmpOrderBean,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
//将临时TableBean对象添加到集合orderBeans
orderBeans.add(tmpOrderBean);
}else { //商品表
try {
BeanUtils.copyProperties(pdBean,value);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
//遍历集合orderBeans,替换掉每个orderBean的pid为pname,然后写出
for (TableBean orderBean : orderBeans) {
orderBean.setPname(pdBean.getPname());
//写出修改后的orderBean对象
context.write(orderBean,NullWritable.get());
}
}
}
(4)编写TableDriver类
package com.atguigu.mapreduce.reducejoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
4)测试
运行程序查看结果
1004 小米 4
1001 小米 1
1005 华为 5
1002 华为 2
1006 格力 6
1003 格力 3
5)总结
缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
MapJoin
1)使用场景Map Join适用于一张表十分小、一张表很大的场景。
2)优点
思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
3)具体办法:采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在Driver驱动类中加载缓存。
//缓存普通文件到Task运行节点。job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
Map Join案例实操
1)需求
表 订单数据表t_order
id pid amount
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
表 商品信息表t_product
pid pname
01 小米
02 华为
03 格力
将商品信息表中数据根据商品pid合并到订单数据表中。
表 最终数据形式
id pname amount
1001 小米 1
1004 小米 4
1002 华为 2
1005 华为 5
1003 格力 3
1006 格力 6
2)需求分析
MapJoin适用于关联表中有小表的情形。
3)实现代码
(1)先在MapJoinDriver驱动类中添加缓存文件
package com.atguigu.mapreduce.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置加载jar包路径
job.setJarByClass(MapJoinDriver.class);
// 3 关联mapper
job.setMapperClass(MapJoinMapper.class);
// 4 设置Map输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\\output"));
// 7 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
(2)在MapJoinMapper类中的setup方法中读取缓存文件
package com.atguigu.mapreduce.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Map<String, String> pdMap = new HashMap<>();
private Text text = new Text();
//任务开始前将pd数据缓存进pdMap
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//通过缓存文件得到小表数据pd.txt
URI[] cacheFiles = context.getCacheFiles();
Path path = new Path(cacheFiles[0]);
//获取文件系统对象,并开流
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(path);
//通过包装流转换为reader,方便按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
//逐行读取,按行处理
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
//切割一行
//01 小米
String[] split = line.split("\t");
pdMap.put(split[0], split[1]);
}
//关流
IOUtils.closeStream(reader);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取大表数据
//1001 01 1
String[] fields = value.toString().split("\t");
//通过大表每行数据的pid,去pdMap里面取出pname
String pname = pdMap.get(fields[1]);
//将大表每行数据的pid替换为pname
text.set(fields[0] + "\t" + pname + "\t" + fields[2]);
//写出
context.write(text,NullWritable.get());
}
}
数据清洗(ETL)
“ETL,是英文Extract-Transform-Load的缩写,用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
去除日志中字段个数小于等于11的日志
(1)编写WebLogMapper类
package com.atguigu.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WebLogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取1行数据
String line = value.toString();
// 2 解析日志
boolean result = parseLog(line,context);
// 3 日志不合法退出
if (!result) {
return;
}
// 4 日志合法就直接写出
context.write(value, NullWritable.get());
}
// 2 封装解析日志的方法
private boolean parseLog(String line, Context context) {
// 1 截取
String[] fields = line.split(" ");
// 2 日志长度大于11的为合法
if (fields.length > 11) {
return true;
}else {
return false;
}
}
}
(2)编写WebLogDriver类
package com.atguigu.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WebLogDriver {
public static void main(String[] args) throws Exception {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "D:/input/inputlog", "D:/output1" };
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加载jar包
job.setJarByClass(LogDriver.class);
// 3 关联map
job.setMapperClass(WebLogMapper.class);
// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置reducetask个数为0
job.setNumReduceTasks(0);
// 5 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
