资源规划

组件 LTSR003 LTSR005 LTSR006 LTSR007 LTSR008
OS centos7.6 centos7.6 centos7.6 centos7.6 centos7.6
JDK jvm jvm jvm jvm jvm
HDFS DataNode/HTTPFS DataNode/HTTPFS DataNode/HTTPFS DataNode/HTTPFS NameNode/DataNode/HTTPFS
YARN NodeManager NodeManager NodeManager NodeManager ResourceManager/NodeManager/mr-jobhistory
Flink N.A StandaloneSessionClusterEntrypoint TaskManagerRunner TaskManagerRunner N.A

安装介质

版本:flink-1.10.0-src.tgz
下载:https://archive.apache.org/dist/flink

环境准备

安装JDK

参考:《 CentOS7.6-安装JDK-1.8.221

安装Hadoop

参考:《CentOS7.6-安装Hadoop-2.7.2

安装Maven

参考:《CentOS7.6-安装Maven-3.2.5

编译Flink

在节点LTSR001上编译Flink。

(1).源码获取

  1. cd ~/software/
  2. curl -o flink-1.10.0-src.tgz https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-src.tgz

(2).解压

  1. tar -zxvf ~/software/flink-1.10.0-src.tgz -C ~/modules/
  2. cd ~/modules
  3. mv flink-1.10.0 flink-1.10.0-src
  4. vi ~/modules/flink-1.10.0-src/flink-runtime-web/pom.xml

修改flink-runtime-web的pom.xml配置,以提高NPM包下载速度,避免由于网络原因造成延时而编译失败。


com.github.eirslett
frontend-maven-plugin
1.6


install node and npm

install-node-and-npm


http://npm.taobao.org/mirrors/node/
http://npm.taobao.org/mirrors/npm/
v10.9.0



npm install

npm


ci —cache-max=0 —no-save —no-bin-links

true





web-dashboard

(3).编译flink-shaded

直接编译flink不行吗,为啥还要编译shaded这个玩意?通过官网描述知道,直接编译hadoop版的flink仅支持hdp的2.4.1、2.6.5、 2.7.5、2.8.3,对于shaded版本也仅仅支持到10.0,如果你想用shaded11那么也需要自己单独编译。所以如果想使用hdp其他版本,以及hadoop相关的发行版(如cdh和hortonwork)也需要单独编译shaded。Flink1.10.0对应的flink-shaded版本为9.0,可以从https://github.com/apache/flink-shaded/releases获得。

  1. cd ~/software/
  2. # wget https://github.com/apache/flink-shaded/archive/release-9.0.zip
  3. # unzip release-9.0.zip
  4. unzip flink-shaded-release-9.0.zip

修改pom.xml。

  1. # vi ~/software/flink-shaded-release-9.0/flink-shaded-hadoop-2-uber/pom.xml
  2. vi ~/software/flink-shaded-9.0/flink-shaded-hadoop-2-uber/pom.xml

添加依赖:

  1. <dependency>
  2. <groupId>commons-cli</groupId>
  3. <artifactId>commons-cli</artifactId>
  4. <version>1.3.1</version>
  5. </dependency>

编译(编译时长大概5-10分钟):

cd ~/software/flink-shaded*9.0
mvn clean install -DskipTests -Dhadoop.version=2.7.2

(4).编译flink

cd ~/modules/flink-1.10.0-src
mvn clean install -DskipTests -Dfast -Pinclude-hadoop -Dhadoop.version=2.7.2

Flink编译后直接生成在${FLINK_BUILD_HOME}/flink-dist/target目录(编译时长大概80分钟),此时可以打tar包以便于后续安装。

cd ~/modules/flink-1.10.0-src/flink-dist/target/flink-1.10.0-bin
tar -cvf flink-1.10.0-bin-hadoop-2.7.2.tar flink-1.10.0
mv flink-1.10.0-bin-hadoop-2.7.2.tar ~/software/

编译成功后的jar包列表:

flink-dist_2.11-1.10.0.jar
flink-shaded-hadoop-2-uber-2.7.2-9.0.jar
flink-table_2.11-1.10.0.jar
flink-table-blink_2.11-1.10.0.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.15.jar

安装Flink

在节点LTSR005上安装Flink,之后再分发给其他节点(LTSR006、LTSR007)。

(1).解压

tar -xvf ~/software/flink-1.10.0-bin-hadoop-2.7.2.tar -C ~/modules

(2).配置

rm -rf ~/modules/flink-1.10.0/conf/flink-conf.yaml
vi ~/modules/flink-1.10.0/conf/flink-conf.yaml

flink-conf.yaml直接替换成以下配置 (注意key跟value之间的空格)。

jobmanager.rpc.address: LTSR005
jobmanager.rpc.port: 6123
jobmanager.heap.size: 1024m
taskmanager.heap.size: 1024m
taskmanager.numberOfTaskSlots: 10
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: 8381
rest.port: 8381
env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
env.java.home: /home/bigdata/modules/jdk1.8.0_221
classloader.resolve-order: parent-first

配置主节点:

vi ~/modules/flink-1.10.0/conf/masters

配置如下:

LTSR005:8381

配置从节点:

vi ~/modules/flink-1.10.0/conf/slaves

配置如下:

LTSR006
LTSR007

(3).环境变量配置

vi ~/.bashrc

配置如下:

export FLINK_HOME=/home/bigdata/modules/flink-1.10.0
export PATH=$FLINK_HOME/bin:$PATH

(4).分发

scp -r ~/modules/flink-1.10.0 bigdata@LTSR006:~/modules/
scp -r ~/modules/flink-1.10.0 bigdata@LTSR007:~/modules/
scp -r ~/.bashrc bigdata@LTSR006:~/
scp -r ~/.bashrc bigdata@LTSR007:~/
scp -r ~/.bashrc bigdata@LTSR003:~/
scp -r ~/.bashrc bigdata@LTSR008:~/
source ~/.bashrc  # 各节点分别执行

启动Flink集群

# master节点上启动集群
cd ~/modules/flink-1.10.0
./bin/start-cluster.sh
./bin/stop-cluster.sh
# 查看job列表(--all:查看所有任务,包括已经取消和停止的)
./bin/flink list --all
# 停止job
./bin/flink cancel ${JobID}

访问入口(谷歌浏览器) :http://LTSR005:8381
image.png

验证

(1).批量WordCoun示例

./bin/flink run examples/batch/WordCount.jar

(2).实时WordCoun示例

# LTSR006节点添加监听端口
# sudo yum install -y nc
nc -l 9001
# 在LTSR005节点运行flink自带的”单词计数”示例
cd ~/modules/flink-1.10.0
./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname LTSR006 --port 9001

在Running Jobs界面可以看到相应的任务生成。
image.png
在监听端口输入一些单词后回车,回到task Managers页签下,从task的Stdout可以看到单词数量统计结果。
image.png

Flink集成Kafka

(1).添加依赖jar包

依赖jar包准备,将jar包直接复制到$FLINK_HOME/lib目录下:

依赖包 下载站点
flink-sql-connector-kafka_2.11-1.10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
kafka-clients-0.11.0.3.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.11.0.3/kafka-clients-0.11.0.3.jar
flink-json-1.10.0.jar ${M2_HOME}/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar

(2).创建topic

cd ~/modules/kafka_2.11-0.11.0.3/
bin/kafka-topics.sh --zookeeper 10.8.0.109:2181,10.8.0.137:2181,10.8.0.117:2181 --create --topic user_transactions --replication-factor 1 --partitions 3

(3).Flink SQL Client

vi ~/modules/flink-1.10.0/conf/sql-client-streaming.yaml

配置如下:

tables: [] 

functions: [] 

execution:
  planner: blink
  type: streaming 
  time-characteristic: event-time
  periodic-watermarks-interval: 200
  result-mode: table
  max-table-result-rows: 1000000
  parallelism: 1
  max-parallelism: 128
  min-idle-state-retention: 0
  max-idle-state-retention: 0
  current-catalog: default_catalog
  current-database: default_database
  restart-strategy:
    type: fallback

deployment:
  response-timeout: 5000
  gateway-address: ""
  gateway-port: 0

进入Flink SQL Client终端:

cd ~/modules/flink-1.10.0
bin/sql-client.sh  embedded -d conf/sql-client-streaming.yaml

读取kafka数据:

use catalog default_catalog;
DROP TABLE IF EXISTS user_transactions_source;
CREATE TABLE user_transactions_source (
    cus_no STRING,                   
    card_no STRING,                  
    card_name STRING,                
    card_type STRING,                
    tranc_no STRING,                 
    txn_dte STRING,                  
    txn_tme STRING,                  
    record_tag STRING,               
    trade_type STRING,               
    dbt_cdt_cde  STRING,             
    amt DOUBLE, 
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_transactions',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = '10.8.0.109:2181,10.8.0.137:2181,10.8.0.117:2181',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = '10.8.0.109:9092,10.8.0.137:9092,10.8.0.117:9092',
    'update-mode' = 'append',
    'format.type' = 'json'
);
select * from user_transactions_source;

发送测试数据:

# kafka生产消息
echo -e '{"cus_no": "011560773924", "card_no":"110009085001000000001", "card_name": "**0001", "card_type": "02", "tranc_no": "160908504921155120200401090850", "txn_dte": "20200401", "txn_tme": "090850", "record_tag": "1", "trade_type": "102", "dbt_cdt_cde": "0", "amt": "99.88", "ts": "2020-04-01T09:08:50Z"}' | ssh vagrant@bigdata-node1 "/home/vagrant/modules/kafka_2.11-0.11.0.3/bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic user_transactions"
# kafka消费消息
cd ~/modules/kafka_2.11-0.11.0.3/
bin/kafka-console-consumer.sh --zookeeper 10.8.0.109:2181,10.8.0.137:2181,10.8.0.117:2181 --topic user_transactions --from-beginning