资源规划
组件 | LTSR003 | LTSR005 | LTSR006 | LTSR007 | LTSR008 |
---|---|---|---|---|---|
OS | centos7.6 | centos7.6 | centos7.6 | centos7.6 | centos7.6 |
JDK | jvm | jvm | jvm | jvm | jvm |
HDFS | DataNode/HTTPFS | DataNode/HTTPFS | DataNode/HTTPFS | DataNode/HTTPFS | NameNode/DataNode/HTTPFS |
YARN | NodeManager | NodeManager | NodeManager | NodeManager | ResourceManager/NodeManager/mr-jobhistory |
Flink | N.A | StandaloneSessionClusterEntrypoint | TaskManagerRunner | TaskManagerRunner | N.A |
安装介质
版本:flink-1.10.0-src.tgz
下载:https://archive.apache.org/dist/flink
环境准备
安装JDK
参考:《 CentOS7.6-安装JDK-1.8.221 》
安装Hadoop
安装Maven
编译Flink
(1).源码获取
cd ~/software/
curl -o flink-1.10.0-src.tgz https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-src.tgz
(2).解压
tar -zxvf ~/software/flink-1.10.0-src.tgz -C ~/modules/
cd ~/modules
mv flink-1.10.0 flink-1.10.0-src
vi ~/modules/flink-1.10.0-src/flink-runtime-web/pom.xml
修改flink-runtime-web的pom.xml配置,以提高NPM包下载速度,避免由于网络原因造成延时而编译失败。
(3).编译flink-shaded
直接编译flink不行吗,为啥还要编译shaded这个玩意?通过官网描述知道,直接编译hadoop版的flink仅支持hdp的2.4.1、2.6.5、 2.7.5、2.8.3,对于shaded版本也仅仅支持到10.0,如果你想用shaded11那么也需要自己单独编译。所以如果想使用hdp其他版本,以及hadoop相关的发行版(如cdh和hortonwork)也需要单独编译shaded。Flink1.10.0对应的flink-shaded版本为9.0,可以从https://github.com/apache/flink-shaded/releases获得。
cd ~/software/
# wget https://github.com/apache/flink-shaded/archive/release-9.0.zip
# unzip release-9.0.zip
unzip flink-shaded-release-9.0.zip
修改pom.xml。
# vi ~/software/flink-shaded-release-9.0/flink-shaded-hadoop-2-uber/pom.xml
vi ~/software/flink-shaded-9.0/flink-shaded-hadoop-2-uber/pom.xml
添加依赖:
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.3.1</version>
</dependency>
编译(编译时长大概5-10分钟):
cd ~/software/flink-shaded*9.0
mvn clean install -DskipTests -Dhadoop.version=2.7.2
(4).编译flink
cd ~/modules/flink-1.10.0-src
mvn clean install -DskipTests -Dfast -Pinclude-hadoop -Dhadoop.version=2.7.2
Flink编译后直接生成在${FLINK_BUILD_HOME}/flink-dist/target目录(编译时长大概80分钟),此时可以打tar包以便于后续安装。
cd ~/modules/flink-1.10.0-src/flink-dist/target/flink-1.10.0-bin
tar -cvf flink-1.10.0-bin-hadoop-2.7.2.tar flink-1.10.0
mv flink-1.10.0-bin-hadoop-2.7.2.tar ~/software/
编译成功后的jar包列表:
flink-dist_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.7.2-9.0.jar
flink-table_2.11-1.10.0.jar
flink-table-blink_2.11-1.10.0.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.15.jar
安装Flink
在节点LTSR005上安装Flink,之后再分发给其他节点(LTSR006、LTSR007)。
(1).解压
tar -xvf ~/software/flink-1.10.0-bin-hadoop-2.7.2.tar -C ~/modules
(2).配置
rm -rf ~/modules/flink-1.10.0/conf/flink-conf.yaml
vi ~/modules/flink-1.10.0/conf/flink-conf.yaml
flink-conf.yaml直接替换成以下配置 (注意key跟value之间的空格)。
jobmanager.rpc.address: LTSR005
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 10
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: 8381
rest.port: 8381
env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
env.java.home: /home/bigdata/modules/jdk1.8.0_221
classloader.resolve-order: parent-first
配置主节点:
vi ~/modules/flink-1.10.0/conf/masters
配置如下:
LTSR005:8381
配置从节点:
vi ~/modules/flink-1.10.0/conf/slaves
配置如下:
LTSR006
LTSR007
(3).环境变量配置
vi ~/.bashrc
配置如下:
export FLINK_HOME=/home/bigdata/modules/flink-1.10.0
export PATH=$FLINK_HOME/bin:$PATH
(4).分发
scp -r ~/modules/flink-1.10.0 bigdata@LTSR006:~/modules/
scp -r ~/modules/flink-1.10.0 bigdata@LTSR007:~/modules/
scp -r ~/.bashrc bigdata@LTSR006:~/
scp -r ~/.bashrc bigdata@LTSR007:~/
scp -r ~/.bashrc bigdata@LTSR003:~/
scp -r ~/.bashrc bigdata@LTSR008:~/
source ~/.bashrc # 各节点分别执行
启动Flink集群
# master节点上启动集群
cd ~/modules/flink-1.10.0
./bin/start-cluster.sh
./bin/stop-cluster.sh
# 查看job列表(--all:查看所有任务,包括已经取消和停止的)
./bin/flink list --all
# 停止job
./bin/flink cancel ${JobID}
访问入口(谷歌浏览器) :http://LTSR005:8381
验证
(1).批量WordCoun示例
./bin/flink run examples/batch/WordCount.jar
(2).实时WordCoun示例
# LTSR006节点添加监听端口
# sudo yum install -y nc
nc -l 9001
# 在LTSR005节点运行flink自带的”单词计数”示例
cd ~/modules/flink-1.10.0
./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname LTSR006 --port 9001
在Running Jobs界面可以看到相应的任务生成。
在监听端口输入一些单词后回车,回到task Managers页签下,从task的Stdout可以看到单词数量统计结果。
Flink集成Kafka
(1).添加依赖jar包
依赖jar包准备,将jar包直接复制到$FLINK_HOME/lib目录下:
依赖包 | 下载站点 |
---|---|
flink-sql-connector-kafka_2.11-1.10.0.jar | https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar |
kafka-clients-0.11.0.3.jar | https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.11.0.3/kafka-clients-0.11.0.3.jar |
flink-json-1.10.0.jar | ${M2_HOME}/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar |
(2).创建topic
cd ~/modules/kafka_2.11-0.11.0.3/
bin/kafka-topics.sh --zookeeper 10.8.0.109:2181,10.8.0.137:2181,10.8.0.117:2181 --create --topic user_transactions --replication-factor 1 --partitions 3
(3).Flink SQL Client
vi ~/modules/flink-1.10.0/conf/sql-client-streaming.yaml
配置如下:
tables: []
functions: []
execution:
planner: blink
type: streaming
time-characteristic: event-time
periodic-watermarks-interval: 200
result-mode: table
max-table-result-rows: 1000000
parallelism: 1
max-parallelism: 128
min-idle-state-retention: 0
max-idle-state-retention: 0
current-catalog: default_catalog
current-database: default_database
restart-strategy:
type: fallback
deployment:
response-timeout: 5000
gateway-address: ""
gateway-port: 0
进入Flink SQL Client终端:
cd ~/modules/flink-1.10.0
bin/sql-client.sh embedded -d conf/sql-client-streaming.yaml
读取kafka数据:
use catalog default_catalog;
DROP TABLE IF EXISTS user_transactions_source;
CREATE TABLE user_transactions_source (
cus_no STRING,
card_no STRING,
card_name STRING,
card_type STRING,
tranc_no STRING,
txn_dte STRING,
txn_tme STRING,
record_tag STRING,
trade_type STRING,
dbt_cdt_cde STRING,
amt DOUBLE,
ts TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_transactions',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '10.8.0.109:2181,10.8.0.137:2181,10.8.0.117:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '10.8.0.109:9092,10.8.0.137:9092,10.8.0.117:9092',
'update-mode' = 'append',
'format.type' = 'json'
);
select * from user_transactions_source;
发送测试数据:
# kafka生产消息
echo -e '{"cus_no": "011560773924", "card_no":"110009085001000000001", "card_name": "**0001", "card_type": "02", "tranc_no": "160908504921155120200401090850", "txn_dte": "20200401", "txn_tme": "090850", "record_tag": "1", "trade_type": "102", "dbt_cdt_cde": "0", "amt": "99.88", "ts": "2020-04-01T09:08:50Z"}' | ssh vagrant@bigdata-node1 "/home/vagrant/modules/kafka_2.11-0.11.0.3/bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic user_transactions"
# kafka消费消息
cd ~/modules/kafka_2.11-0.11.0.3/
bin/kafka-console-consumer.sh --zookeeper 10.8.0.109:2181,10.8.0.137:2181,10.8.0.117:2181 --topic user_transactions --from-beginning