Hadoop环境搭建

Linux系统安装(VMware)

安装选项

  1. 配置:2核4g 50g
  2. 软件选择:GNOME桌面
  3. 分区:/boot 1024mb ext4 ;/swap 4096mb swap

工具包安装(最小化安装)

net-tool工具包集合

包含ifconfig等命令

  1. yum install -y net-tools

vim编辑器

  1. yum install -y vim

rsync

  1. yum install -y rsync

psmisc(搭建hadoopHa需要)

  1. yum -y install psmisc

netcat

  1. yum -y install nc
  2. 查看端口是否占用
  3. netstat -nlp | grep *

卸载自带jdk

  1. rpm -qa | grep -i java | xargs -n1 rpm -e --nodeps
  2. ##rpm -qa:查询所安装的所有rpm软件包
  3. ##grep -i:忽略大小写
  4. ##xargs -n1:表示每次只传递一个参数
  5. ##rpm -e –nodeps:强制卸载软件

关闭防火墙及开机自启

  1. systemctl stop firewalld
  2. systemctl disable firewalld.service

配置静态IP

vi /etc/sysconfig/network-scripts/ifcfg-ens33

  1. BOOTPROTO=static
  2. IPADDR=192.168.10.102
  3. GATEWAY=192.168.10.2
  4. DNS1=192.168.10.2

安装Mysql

复制安装包到安装目录

解压 MySQL 安装包

  1. tar -xf mysql-5.7.28-1.el7.x86_64.rpmbundle.tar

卸载自带数据库
  1. rpm -qa|grep mariadb
  2. sudo rpm -e --nodeps mariadb-libs

安装依赖
  1. yum install -y libaio

在安装目录下执行 rpm 安装

  1. sudo rpm -ivh mysql-community-common-5.7.28-1.el7.x86_64.rpm
  2. sudo rpm -ivh mysql-community-libs-5.7.28-1.el7.x86_64.rpm
  3. sudo rpm -ivh mysql-community-libs-compat-5.7.28-1.el7.x86_64.rpm
  4. sudo rpm -ivh mysql-community-client-5.7.28-1.el7.x86_64.rpm
  5. sudo rpm -ivh mysql-community-server-5.7.28-1.el7.x86_64.rpm
  6. 注意:按照顺序依次执行

删除/etc/my.cnf文件中 datadi 指向的目录下的所有内容,如果有内容的情况下

查看 datadir 的值:

  1. [mysqld]
  2. datadir=/var/lib/mysql

删除/var/lib/mysql 目录下的所有内容:

  1. [atguigu @hadoop102 mysql]# cd /var/lib/mysql
  2. [atguigu @hadoop102 mysql]# sudo rm -rf ./* //注意执行命令的位置

初始化数据库

  1. [atguigu @hadoop102 opt]$ sudo mysqld --initialize --user=mysql

查看临时生成的root用户的密码

  1. [atguigu @hadoop102 opt]$ sudo cat /var/log/mysqld.log

启动MySQL 服务

  1. [atguigu @hadoop102 opt]$ sudo systemctl start mysqld

登录 MySQL 数据库

  1. [atguigu @hadoop102 opt]$ mysql -uroot -p
  2. Enter password: 输入临时生成的密码

必须先修改root用户的密码否则执行其他的操作会报错

  1. mysql> set password = password("新密码");

修改mysql库下的user表中 root用户允许任意ip 连接

  1. mysql> update mysql.user set host='%' where user='root';
  2. mysql> flush privileges;

克隆虚拟机

设置ssh免密

实现从主节点到从节点免密登录

  1. ssh-keygen -t rsa
  2. ssh-copy-id (IP)

用到的脚本

文件分发脚本

  1. #!/bin/bash
  2. #1. 判断参数个数
  3. if [ $# -lt 1 ]
  4. then
  5. echo Not Enough Arguement!
  6. exit;
  7. fi
  8. #2. 遍历集群所有机器
  9. for host in 虚拟机IP:
  10. do
  11. echo ==================== $host ====================
  12. #3. 遍历所有目录,挨个发送
  13. for file in $@
  14. do
  15. #4. 判断文件是否存在
  16. if [ -e $file ]
  17. then
  18. #5. 获取父目录
  19. pdir=$(cd -P $(dirname $file); pwd)
  20. #6. 获取当前文件的名称
  21. fname=$(basename $file)
  22. ssh $host "mkdir -p $pdir"
  23. rsync -av $pdir/$fname $host:$pdir
  24. else
  25. echo $file does not exists!
  26. fi
  27. done
  28. done

安装JDK

解压

  1. tar -zxvf jdk-.tar.gz -C /user/local/src
  2. ##解压jdk文件 到/user/local/src

配置环境变量

  1. vim /user/.base_profile
  2. ##仅对当前用户生效
  1. export JAVA_HOME=/user/local/src/jdk***
  2. export PATH=$PATH:$JAVA_HOME/bin

分发jdk

scp

  1. scp -r(递归) $JAVA_HOME user@ip:/***/***

分发脚本

安装hadoop

解压

  1. tar -zxvf hadoop-.tar.gz -C /user/local/src
  2. ##解压hadoop文件 到/user/local/src

配置环境变量

  1. vim /user/.bash_profile
  2. ##仅对当前用户生效
  1. export HADOOP_HOME=/user/local/src/hadoop***
  2. export PATH=$PATH:$HADOOP_HOME/bin
  3. export PATH=$PATH:$HADOOP_HOME/sbin

常用端口号说明

端口名称 Hadoop2.x Hadoop3.x
NameNode内部通信端口 8020 / 9000 8020 / 9000/9820
NameNode HTTP UI 50070 9870
MapReduce查看执行任务端口 8088 8088
历史服务器通信端口 19888 19888

配置集群

核心配置文件

  1. cd $HADOOP_HOME/etc/hadoop
  2. vim core-site.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4. <!-- 指定NameNode的地址 -->
  5. <property>
  6. <name>fs.defaultFS</name>
  7. <!--<value>hdfs://hadoopIP:8020</value>-->
  8. </property>
  9. <!-- 指定hadoop数据的存储目录 -->
  10. <property>
  11. <name>hadoop.tmp.dir</name>
  12. <value>/opt/module/hadoop-3.1.3/data</value>
  13. </property>
  14. <!-- 配置HDFS网页登录使用的静态用户为atguigu -->
  15. <property>
  16. <name>hadoop.http.staticuser.user</name>
  17. <!--<value>用户名</value>-->
  18. </property>
  19. </configuration>

HDFS配置文件

vim hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- nn web端访问地址-->
    <property>
        <name>dfs.namenode.http-address</name>
        <!--<value>hadoopIP:9870</value>-->
    </property>
    <!-- 2nn web端访问地址-->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <!--<value>hadoopIP:9868</value>-->
    </property>
</configuration>

YARN配置文件

vim yarn-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 指定MR走shuffle -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>

    <!-- 指定ResourceManager的地址-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <!--<value>hadoopIP</value>-->
    </property>

    <!-- 环境变量的继承 -->
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>
</configuration>

MapReduce配置文件

vim mapred-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 指定MapReduce程序运行在Yarn上 -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

配置workers

vim /opt/module/hadoop-3.1.3/etc/hadoop/slaves

在该文件中增加如下内容:

hadoopip
hadoopip
hadoopip

分发Hadoop配置文件

启动集群

scp -r(递归) $JAVA_HOME user@ip:/***/***

格式化NameNode

如果集群是第一次启动,需要在配置的NameNode节点格式化NameNode(注意:格式化NameNode,会产生新的集群id,导致NameNodeDataNode集群id不一致,集群找不到已往数据。如果集群在运行过程中报错,需要重新格式化NameNode的话,一定要先停止namenodedatanode进程,并且要删除所有机器的datalogs目录,然后再进行格式化。)

hdfs namenode -format

启动HDFS

sbin/start-dfs.sh

启动YARN

在配置了ResourceManager的节点启动

sbin/start-yarn.sh

启动历史服务器

mapred --daemon start historyserver

配置HadoopHA

2nn不存在,由standby和nn来干

HDFS-HA

core-site.xml
<configuration>
<!-- 把多个NameNode的地址组装成一个集群mycluster -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
    </property>
<!-- 指定hadoop运行时文件的存储目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/root/local/src/XXX/data</value>
    </property>
<!-- 指定zkfc要连接的zkServer地址 -->
    <property>
        <name>ha.zookeeper.quorum</name>
        <value>Master:2181,Slave1:2181,Slave2:2181</value>
    </property>
</configuration>

hdfs-site.xml
<configuration>
    <!-- NameNode数据的存储目录 -->
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file://${hadoop.tmp.dir}/name</value>
    </property>
    <!-- DataNode数据的存储目录 -->
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file://${hadoop.tmp.dir}/data</value>
    </property>
    <!-- JournalNode数据的存储目录 -->
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>${hadoop.tmp.dir}/jn</value>
    </property>
    <!-- 完全分布式集群名称 -->
    <property>
        <name>dfs.nameservices</name>
        <value>mycluster</value>
    </property>
    <!-- 集群中NameNode节点都有哪些 -->
    <property>
        <name>dfs.ha.namenodes.mycluster</name>
        <value>nn1,nn2</value>
    </property>
    <!-- NameNode的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn1</name>
        <value>Master:8020</value>
    </property>
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn2</name>
        <value>Slave1:8020</value>
    </property>
    <!-- NameNode的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.mycluster.nn1</name>
        <value>Master:9870</value>
    </property>
    <property>
        <name>dfs.namenode.http-address.mycluster.nn2</name>
        <value>Slave1:9870</value>
    </property>
    <!-- 指定NameNode元数据在JournalNode上的存放位置 -->
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://Master:8485;Slave1:8485;Slave2:8485/mycluster</value>
    </property>
    <!-- 访问代理类:client用于确定哪个NameNode为Active -->
    <property>
        <name>dfs.client.failover.proxy.provider.mycluster</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>
    <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>sshfence</value>
    </property>
    <!-- 使用隔离机制时需要ssh密钥登陆 -->
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/root/.ssh/id_rsa</value>
    </property>
    <!-- 启动nn故障自动转移 -->
    <property>
        <name>dfs.ha.automatic-failover.enable</name>
        <value>true</value>
    </property>
</configuration>

启动
启动每个节点journalnode
hadoop-daemon.sh start journalnode
格式化nn
hdfs namenode -format
hadoop-daemon.sh start namenode
在另一台namenode同步
hdfs namenode -bootstrapStandby
之后启动另一个namenode
hadoop-daemon.sh start namenode

查看某个节点的状态
hdfs haadmin -getServiceState nn2
将某个节点设为active
hdfs haadmin -transitionToActive nn1
启动zookeeper
初始化zkfc
hdfs zkfc -formatZK
重启集群

YARN-HA

<configuration>
<!-- 指定MR走shuffle -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <!-- 启动yarnHA -->
    <property>
        <name>yarn.resourcemanager.ha.enabled</name>
        <value>true</value>
    </property>
    <!-- 声明recourcemanager的地址 -->
    <property>
        <name>yarn.resourcemanager.cluster-id</name>
        <value>cluster-yarn1</value>
    </property>
    <!-- 配置recourcemanager的逻辑列表 -->
    <property>
        <name>yarn.resourcemanager.ha.rm-ids</name>
        <value>rm1,rm2</value>
    </property>
    <!-- rm1 -->
    <!-- 主机名 -->
    <property>
       <name>yarn.resourcemanager.hostname.rm1</name>
       <value>Master</value>
    </property>
    <!-- rm2 -->
    <!-- 主机名 -->
    <property>
       <name>yarn.resourcemanager.hostname.rm2</name>
       <value>Slave1</value>
    </property>
    <!-- zookeeper地址 -->
    <property>
       <name>yarn.resourcemanager.zk-address</name>
       <value>Master:2181,Slave1:2181,Slave2:2181</value>
    </property>
    <!-- 启用自动恢复 -->
    <property>
       <name>yarn.resourcemanager.recovery.enabled</name>
       <value>true</value>
    </property>
    <!-- 指定resourcemanager的状态信息存储在zk中 -->
    <property>
       <name>yarn.resourcemanager.store.class</name>
       <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
    </property>
</configuration>
启动resourcemanager和nodemanager
start-yarn-sh
在另一台节点启动resourcemanager
yarn-daemon.sh start resourcemanager

错误解决方法

启动dfs时找不到JAVA_HOME

在hadoop目录/ect/hadoop/hadoop_env修改$JAVA_HOME的值为JAVA的安装路径

配置zookeeper

解压

tar -zxvf zookeeper-.tar.gz -C /user/local/src
##解压zookeeper文件 到/user/local/src

修改配置文件

cd $ZOOKEEPER_HOME
mkdir zkData
##创建zookeeper保存数据的文件夹
cd conf/
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg

修改dataDir=$ZOOKEEPER_HOME/zkData

启动服务

本地模式

bin/zkServer.sh start
bin/zkServer.sh status
##显示状态Mode: standalone

集群模式

分发zookeeper

创建myid文件
cd $ZOOKEEPER_HOME/zkData
vim myid

写入服务器编号分发,修改每台服务器的编号

配置zoo.cfg文件

增加以下配置后分发

server.0=hadoop00:2888:3888
server.2=hadoop02:2888:3888
server.3=hadoop03:2888:3888
server.A=B:C:D
A=myid,B=IP,c=Follower与leader交换消息的端口,D=Leader挂掉后重新选举的端口

启动服务器

Hive部署

解压

tar -zxvf hive-.tar.gz -C /user/local/src
##解压zookeeper文件 到/user/local/src

改名

配置环境变量

vim /user/.base_profile
##仅对当前用户生效
export HIVE_HOME=/user/local/src/hive
export PATH=$PATH:$HIVE_HOME/bin

元数据配置到Mysql

拷贝Mysql的JDBC驱动到Hive的lib下

配置hive-site.xml

在conf目录下hive-default.xml.template文件更名为hive-site.xml或新建

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <!-- jdbc 连接的 URL -->
 <property>
 <name>javax.jdo.option.ConnectionURL</name>
 <value>jdbc:mysql://hadoop00:3306/metastore?useSSL=false</value>
</property>
 <!-- jdbc 连接的 Driver-->
 <property>
 <name>javax.jdo.option.ConnectionDriverName</name>
 <value>com.mysql.jdbc.Driver</value>
</property>
<!-- jdbc 连接的 username-->
 <property>
 <name>javax.jdo.option.ConnectionUserName</name>
 <value>root</value>
 </property>
 <!-- jdbc 连接的 password -->
 <property>
 <name>javax.jdo.option.ConnectionPassword</name>
 <value>000000</value>
</property>
 <!-- Hive 元数据存储版本的验证 -->
 <property>
 <name>hive.metastore.schema.verification</name>
 <value>false</value>
</property>
 <!--元数据存储授权-->
 <property>
 <name>hive.metastore.event.db.notification.api.auth</name>
 <value>false</value>
 </property>
 <!-- Hive 默认在 HDFS 的工作目录 -->
 <property>
 <name>hive.metastore.warehouse.dir</name>
 <value>/user/hive/warehouse</value>
 </property>
</configuration>

新建Hive元数据库

mysql> create database metastore;
mysql> quit;

初始化元数据库

schematool -initSchema -dbType mysql -verbose

启动Hive

错误解决方法

[ERROR] Terminal initialization failed; falling back to unsupported

出现此种错误应该是jar版本包冲突了,启动hive的时候,由于hive依赖hadoop,启动hive,会将hadoop的配置以及jar包等等导入到hive中,导致jar包版本冲突,下面贴一下错误:

Logging initialized using configuration in jar:file:/opt/app/hive/lib/hive-common-1.2.1.jar!/hive-log4j.properties
[ERROR] Terminal initialization failed; falling back to unsupported
java.lang.IncompatibleClassChangeError: Found class jline.Terminal, but interface was expected
    at jline.TerminalFactory.create(TerminalFactory.java:101)
    at jline.TerminalFactory.get(TerminalFactory.java:158)
    at jline.console.ConsoleReader.<init>(ConsoleReader.java:229)
    at jline.console.ConsoleReader.<init>(ConsoleReader.java:221)
    at jline.console.ConsoleReader.<init>(ConsoleReader.java:209)
    at org.apache.hadoop.hive.cli.CliDriver.setupConsoleReader(CliDriver.java:787)
    at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:721)
    at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
    at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

Exception in thread "main" java.lang.IncompatibleClassChangeError: Found class jline.Terminal, but interface was expected
    at jline.console.ConsoleReader.<init>(ConsoleReader.java:230)
    at jline.console.ConsoleReader.<init>(ConsoleReader.java:221)
    at jline.console.ConsoleReader.<init>(ConsoleReader.java:209)
    at org.apache.hadoop.hive.cli.CliDriver.setupConsoleReader(CliDriver.java:787)
    at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:721)
    at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:681)
    at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:621)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

解决办法:将hive下的新版本jline的JAR包拷贝到hadoop下然后删除原来的

cp /hive/lib/jline-2.12.jar  /hadoop/share/hadoop/yarn/lib/
rm -rf jline-0.9.94.jar

Sqoop组件部署

解压

tar -zxvf Sqoop-.tar.gz -C /user/local/src
##解压Sqoop文件 到/user/local/src

修改配置文件

进去sqoop的conf目录下

重命名sqoop-env-template.sh为sqoop-env.sh

mv sqoop-env-template.sh sqoop-env.sh
#Set path to where bin/hadoop is available
export HADOOP_COMMON_HOME=/root/local/src/hadoop-2.6.0

#Set path to where hadoop-*-core.jar is available
export HADOOP_MAPRED_HOME=/root/local/src/hadoop-2.6.0

#set the path to where bin/hbase is available
#export HBASE_HOME=

#Set the path to where bin/hive is available
export HIVE_HOME=/root/local/src/hive

#Set the path for where zookeper config dir is
export ZOOKEPER_HOME=/root/local/src/zookeeper-3.4.5
export ZOOCFGDIR=/root/local/src/zookeeper-3.4.5

拷贝JDBC驱动

cp mysql-connector-java-5.1.27-bin.jar /root/local/src/sqoop.1.4.6/lib/

连接测试

bin/sqoop list-tables -connect jdbc:mysql://127.0.0.1:3306 --username root --password 000000

结果图

环境搭建 - 图1

Kafka安装

解压

tar -zxvf kafka-.tar.gz -C /user/local/src
##解压kafka文件 到/user/local/src

配置

server.properties

broker.id=0(每台节点不能重复)
log.dirs=/root/local/src/kafka_2.11-1.0.0/datas(数据存储地址)
zookeeper.connect=Master:2181,Slave1:2181,Slave2:2181/kafka(zk连接地址和数据文件)

启动服务器

kafka-server-start.sh -daemon config/server.properties

创建主题

bin/kafka-topics.sh --zookeeper Master:2181/kafka(测试环境填一个 zKurl) --topic first --create --partitions 1(分区数) --replication-factor 3(副本数)

启动生产者

./kafka-console-producer.sh --broker-list Master:9092 --topic first

启动消费者

kafka-console-consumer.sh --bootstrap-server Master:9092(kafka端口号) --topic first --from-beginning(历史数据)

Flink安装

Standalone模式

解压

Flume安装

解压(如果guava-11.0.2.jar包和hadoop冲突则删除)

监控端口号数据


a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = Master
a1.sources.r1.port = 44444

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

监控文件变化采集到HDFS

不支持断点续传

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /tmp/root/hive.log

a1.sinks.k1.type = hdfs
#集群地址
a1.sinks.k1.hdfs.path = hdfs://mycluster/flume/%Y%m%d/%H
#文件前缀
a1.sinks.k1.hdfs.filePrefix = logs- 
#是否滚动文件夹
a1.sinks.k1.hdfs.round = true
#一小时更新新建文件夹一次
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
#使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.fileType = DataStream
#更新文件时间 大小
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 134217700
#文件更新与事件数无关
a1.sinks.k1.hdfs.rollCount = 0

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

监控目录采集到HDFS

不支持对实时追加内容的文件进行监听

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/local/src/flume-1.7.0-bin/datalog
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.fileHeader = true
#忽略.tmp结尾的文件
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)

a1.sinks.k1.type = hdfs
#集群地址
a1.sinks.k1.hdfs.path = hdfs://mycluster/flume/%Y%m%d/%H
#文件前缀
a1.sinks.k1.hdfs.filePrefix = upload-
#是否滚动文件夹
a1.sinks.k1.hdfs.round = true
#一小时更新新建文件夹一次
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
#使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.fileType = DataStream
#更新文件时间 大小
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 134217700
#文件更新与事件数无关
a1.sinks.k1.hdfs.rollCount = 0

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

监控目录下多个追加文件

监听多个实时追加的文件,能断点续传

使用echo ***>>***.txt 追加内容
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/local/src/flume-1.7.0-bin/datalog/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /root/local/src/flume-1.7.0-bin/datalog/files/.*file.*
a1.sources.r1.filegroups.f2 = /root/local/src/flume-1.7.0-bin/datalog/files2/.*log.*


a1.sinks.k1.type = hdfs
#集群地址
a1.sinks.k1.hdfs.path = hdfs://mycluster/flume/%Y%m%d/%H
#文件前缀
a1.sinks.k1.hdfs.filePrefix = upload2-
#是否滚动文件夹
a1.sinks.k1.hdfs.round = true
#一小时更新新建文件夹一次
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = hour
#使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100
a1.sinks.k1.hdfs.fileType = DataStream
#更新文件时间 大小
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 134217700
#文件更新与事件数无关
a1.sinks.k1.hdfs.rollCount = 0

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

kafka连接器

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /root/local/src/flume-1.7.0-bin/datalog/biz.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.kafka.topic = hunter
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

Spark部署

解压

本地模式

#spark-shell
bin/spark-shell
#提交应用
bin/spark-submit \
--class main类地址 \
--master local[2] \
jar文件地址 \
10

集群模式(Standalone)

修改配置文件

cd ./conf
mv slave.template slave
localhost改成节点地址

mv spark-env.sh.template spark-env.sh
#添加Master节点和java_home
export JAVA_HOME=
SPARK_MASTER_HOST=Master
SPARK_MASTER_POST=7777

分发配置文件

bin/spark-submit \
--class main类地址 \
--master spark://ip:7077 \
jar文件地址 \
10

yarn模式

YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
bin/spark-submit \
--class main类地址 \
--master yarn \
#打印控制台 集群模式看不到结果
--deploy-mode (cluster client) \
jar文件地址 \
10