简介

在Flink1.9中Hive集成发布为预览分支。Flink1.9版本允许用户使用SQL DDL持久化Flink-specific元数据 (e.g. Kafka tables) 到Hive元数据库,调用在Hive中定义的UDFs,和使用Flink读和写Hive表,Flink1.10通过进一步的发展完善了这项工作,将与生产就绪的Hive集成到Flink,并与大多数Hive版本完全兼容。

资源规划

组件 bigdata-node1 bigdata-node2 bigdata-node3
OS centos7.6 centos7.6 centos7.6
JDK jvm jvm jvm
HDFS NameNode/SecondaryNameNode/DataNode/JobHistoryServer/ApplicationHistoryServer DataNode DataNode
YARN ResourceManager NodeManager NodeManager
Hive HiveServer2/Metastore/CLI/Beeline CLI/Beeline CLI/Beeline
MySQL N.A N.A MySQL Server
Maven mvn N.A N.A
Flink StandaloneSessionClusterEntrypoint TaskManagerRunner TaskManagerRunner

安装介质

版本:flink-1.10.0-src.tgz
下载:https://archive.apache.org/dist/flink

环境准备

安装JDK

参考:《CentOS7.6-安装JDK-1.8.221

安装Hadoop

参考:《CentOS7.6-安装Hadoop-2.7.2

安装Hive

参考:《CentOS7.6-安装Hive-2.3.4

安装Maven

根据Flink官方推荐此处选择Maven-3.2.5,详情查阅:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/flinkDev/building.html。Maven的安装请参考:《CentOS7.6-安装Maven-3.2.5

编译Flink

在节点bigdata-node1上编译Flink。

(1).源码获取

  1. cd /share
  2. curl -o flink-1.10.0-src.tgz https://archive.apache.org/dist/flink/flink-1.10.0/flink-1.10.0-src.tgz

(2).解压

  1. cd /share
  2. tar -zxvf flink-1.10.0-src.tgz -C ~/modules/
  3. cd ~/modules
  4. mv flink-1.10.0 flink-1.10.0-src
  5. vi ~/modules/flink-1.10.0-src/flink-runtime-web/pom.xml

修改flink-runtime-web的pom.xml配置,以提高NPM包下载速度,避免由于网络原因造成延时而编译失败。


com.github.eirslett
frontend-maven-plugin
1.6


install node and npm

install-node-and-npm


http://npm.taobao.org/mirrors/node/
http://npm.taobao.org/mirrors/npm/
v10.9.0



npm install

npm


ci —cache-max=0 —no-save —no-bin-links

true





web-dashboard

(3).编译flink-shaded

直接编译flink不行吗,为啥还要编译shaded这个玩意?通过官网描述知道,直接编译hadoop版的flink仅支持hdp的2.4.1、2.6.5、 2.7.5、2.8.3,对于shaded版本也仅仅支持到10.0,如果你想用shaded11那么也需要自己单独编译。所以如果想使用hdp其他版本,以及hadoop相关的发行版(如cdh和hortonwork)也需要单独编译shaded。Flink1.10.0对应的flink-shaded版本为9.0,可以从https://github.com/apache/flink-shaded/releases获得。

  1. cd /share
  2. unzip flink-shaded-release-9.0.zip

修改flink-shaded-9.0/flink-shaded-hadoop-2-uber/pom.xml。

  1. <dependency>
  2. <groupId>commons-cli</groupId>
  3. <artifactId>commons-cli</artifactId>
  4. <version>1.3.1</version>
  5. </dependency>

编译(编译时长大概5-10分钟):

  1. cd /share/flink-shaded-release-9.0
  2. mvn clean install -DskipTests -Dhadoop.version=2.7.2

(4).编译flink

  1. cd ~/modules/flink-1.10.0-src
  2. mvn clean install -DskipTests -Dfast -Pinclude-hadoop -Dhadoop.version=2.7.2

Flink编译后直接生成在${FLINK_BUILD_HOME}/flink-dist/target目录(编译时长大概80分钟),此时可以打tar包以便于后续安装。

  1. cd ~/modules/flink-1.10.0-src/flink-dist/target/flink-1.10.0-bin
  2. tar -cvf flink-1.10.0-bin-hadoop-2.7.2.tar flink-1.10.0
  3. mv flink-1.10.0-bin-hadoop-2.7.2.tar /share/

编译成功后的jar包列表:

  1. flink-dist_2.11-1.10.0.jar
  2. flink-shaded-hadoop-2-uber-2.7.2-9.0.jar
  3. flink-table_2.11-1.10.0.jar
  4. flink-table-blink_2.11-1.10.0.jar
  5. log4j-1.2.17.jar
  6. slf4j-log4j12-1.7.15.jar

安装Flink

在节点bigdata-node1上安装Flink,之后再分发给其他节点。

(1).解压

  1. tar -xvf /share/flink-1.10.0-bin-hadoop-2.7.2.tar -C ~/modules

(2).配置

  1. rm -rf ~/modules/flink-1.10.0/conf/flink-conf.yaml
  2. vi ~/modules/flink-1.10.0/conf/flink-conf.yaml

flink-conf.yaml直接替换成以下配置 (注意key跟value之间的空格)。

  1. jobmanager.rpc.address: bigdata-node1
  2. jobmanager.rpc.port: 6123
  3. jobmanager.heap.size: 1024m
  4. taskmanager.heap.size: 1024m
  5. taskmanager.numberOfTaskSlots: 10
  6. taskmanager.memory.preallocate: false
  7. parallelism.default: 1
  8. jobmanager.web.port: 8381
  9. rest.port: 8381
  10. env.java.opts: -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
  11. env.java.home: /home/vagrant/modules/jdk1.8.0_221
  12. classloader.resolve-order: parent-first

配置主节点:

  1. vi ~/modules/flink-1.10.0/conf/masters

配置如下:

  1. bigdata-node1:8381

配置从节点:

  1. vi ~/modules/flink-1.10.0/conf/slaves

配置如下:

  1. bigdata-node2
  2. bigdata-node3

(3).环境变量配置

  1. vi ~/.bashrc

配置如下:

  1. export FLINK_HOME=/home/vagrant/modules/flink-1.10.0
  2. export PATH=$FLINK_HOME/bin:$PATH

(4).分发

  1. scp -r ~/modules/flink-1.10.0 vagrant@bigdata-node2:~/modules/
  2. scp -r ~/modules/flink-1.10.0 vagrant@bigdata-node3:~/modules/
  3. scp -r ~/.bashrc vagrant@bigdata-node2:~/
  4. scp -r ~/.bashrc vagrant@bigdata-node3:~/
  5. source ~/.bashrc # 各节点分别执行

启动Flink集群

  1. # master节点上启动集群
  2. cd ~/modules/flink-1.10.0
  3. ./bin/start-cluster.sh
  4. ./bin/stop-cluster.sh
  5. # 查看job列表(--all:查看所有任务,包括已经取消和停止的)
  6. ./bin/flink list --all
  7. # 停止job
  8. ./bin/flink cancel ${JobID}

访问入口(谷歌浏览器) :http://bigdata-node1:8381
image.png

验证

(1).批量WordCoun示例

  1. cd ~/modules/flink-1.10.0
  2. ./bin/flink run examples/batch/WordCount.jar

(2).实时WordCoun示例

  1. # bigdata-node2节点添加监听端口
  2. # sudo yum install -y nc
  3. nc -l 9000
  4. # 在bigdata-node1节点运行flink自带的”单词计数”示例
  5. cd ~/modules/flink-1.10.0
  6. ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname bigdata-node2 --port 9000

在Running Jobs界面可以看到相应的任务生成。
image.png
在监听端口输入一些单词后回车,回到task Managers页签下,从task的Stdout可以看到单词数量统计结果。
image.png

Flink集成Hive

添加依赖jar包

依赖jar包准备,将jar包直接复制到$FLINK_HOME/lib目录下:

依赖包 下载站点
flink-connector-hive_2.11-1.10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.10.0/flink-connector-hive_2.11-1.10.0.jar
flink-json-1.10.0.jar ${M2_HOME}/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar
flink-jdbc_2.11-1.10.0.jar ${M2_HOME}/org/apache/flink/flink-json/1.10.0/flink-jdbc_2.11-1.10.0.jar

另外,有部分jar包从${HIVE_HOME}/lib下获取:

  1. export HIVE_HOME=/home/vagrant/modules/apache-hive-2.3.4-bin
  2. cp ${HIVE_HOME}/lib/hive-exec-2.3.4.jar ~/modules/flink-1.10.0/lib/
  3. cp ${HIVE_HOME}/lib/datanucleus-api-jdo-4.2.4.jar ~/modules/flink-1.10.0/lib/
  4. cp ${HIVE_HOME}/lib/datanucleus-core-4.1.17.jar ~/modules/flink-1.10.0/lib/
  5. cp ${HIVE_HOME}/lib/datanucleus-rdbms-4.1.19.jar ~/modules/flink-1.10.0/lib/
  6. cp ${HIVE_HOME}/lib/mysql-connector-java-5.1.47.jar ~/modules/flink-1.10.0/lib/
  7. cp ${HIVE_HOME}/lib/hive-common-2.3.4.jar ~/modules/flink-1.10.0/lib/
  8. cp ${HIVE_HOME}/lib/hive-metastore-2.3.4.jar ~/modules/flink-1.10.0/lib/
  9. cp ${HIVE_HOME}/lib/hive-shims-common-2.3.4.jar ~/modules/flink-1.10.0/lib/
  10. cp ${HIVE_HOME}/lib/antlr-runtime-3.5.2.jar ~/modules/flink-1.10.0/lib/
  11. cp ${HIVE_HOME}/lib/javax.jdo-3.2.0-m3.jar ~/modules/flink-1.10.0/lib/
  12. cp ${HIVE_HOME}/lib/libfb303-0.9.3.jar ~/modules/flink-1.10.0/lib/
  13. cp ${HIVE_HOME}/lib/commons-pool-1.5.4.jar ~/modules/flink-1.10.0/lib/
  14. cp ${HIVE_HOME}/lib/commons-dbcp-1.4.jar ~/modules/flink-1.10.0/lib/
  15. cp ${HIVE_HOME}/lib/bonecp-0.8.0.RELEASE.jar ~/modules/flink-1.10.0/lib/

注意:分发jar包到所有集群节点上。

  1. scp -r ~/modules/flink-1.10.0/lib vagrant@bigdata-node2:~/modules/flink-1.10.0/
  2. scp -r ~/modules/flink-1.10.0/lib vagrant@bigdata-node3:~/modules/flink-1.10.0/

Flink SQL Client

  1. vi ~/modules/flink-1.10.0/conf/sql-client-hive.yaml

配置如下:

  1. tables: []
  2. functions: []
  3. catalogs:
  4. - name: myhive
  5. type: hive
  6. hive-conf-dir: /home/vagrant/modules/apache-hive-2.3.4-bin/conf
  7. hive-version: 2.3.4
  8. default-database: flinkhivedb
  9. execution:
  10. planner: blink
  11. type: batch
  12. time-characteristic: event-time
  13. periodic-watermarks-interval: 200
  14. result-mode: table
  15. max-table-result-rows: 1000000
  16. parallelism: 1
  17. max-parallelism: 128
  18. min-idle-state-retention: 0
  19. max-idle-state-retention: 0
  20. current-catalog: default_catalog
  21. current-database: default_database
  22. restart-strategy:
  23. type: fallback
  24. deployment:
  25. response-timeout: 5000
  26. gateway-address: ""
  27. gateway-port: 0

分发配置文件,并重启Flink集群。

  1. scp -r ~/modules/flink-1.10.0/conf/sql-client-hive.yaml vagrant@bigdata-node2:~/modules/flink-1.10.0/conf/
  2. scp -r ~/modules/flink-1.10.0/conf/sql-client-hive.yaml vagrant@bigdata-node3:~/modules/flink-1.10.0/conf/

启动Flink SQL Cli。

  1. cd ~/modules/flink-1.10.0
  2. bin/sql-client.sh embedded -d conf/sql-client-hive.yaml

基本操作如下:

  1. -- 命令行帮助
  2. Flink SQL> help
  3. -- 查看当前会话的catalog,其中myhive为自己配置的,default_catalog为默认的
  4. Flink SQL> show catalogs;
  5. default_catalog
  6. myhive
  7. -- 使用catalog
  8. Flink SQL> use catalog myhive;
  9. -- 查看当前catalog的数据库
  10. Flink SQL> show databases;
  11. -- 创建数据库
  12. Flink SQL> create database testdb;
  13. -- 删除数据库
  14. Flink SQL> drop database testdb;
  15. -- 进入数据库
  16. Flink SQL> use testdb;
  17. -- 创建表
  18. Flink SQL> create table code_city(id int,city string,province string,event_time string);
  19. -- 查看当前catalog的库表
  20. Flink SQL> show tables;
  21. -- 查看库表描述
  22. Flink SQL> describe code_city;
  23. -- 删除表
  24. Flink SQL> drop table code_city;
  25. -- 查询表
  26. Flink SQL> select * from code_city;
  27. -- 插入数据(需要在hive端建表才能插入记录)
  28. Flink SQL> INSERT into code_city values(101,'南京1','江苏1','');
  29. Flink SQL> INSERT INTO code_city SELECT 102,'南京2','江苏2','';
  30. Flink SQL> insert overwrite code_city select id+1,city,province,event_time from code_city;

注意:Flink SQL Client仅能用于内嵌模式(embedded),无法远程操作。

Flink SQL Gateway

Flink SQL Gateway是一项服务,允许其他应用程序通过REST API轻松与Flink群集进行交互。应用程序(例如Java/Python/Shell程序,Postman)可以使用REST API提交查询,取消作业,检索结果等。Flink JDBC Driver使JDBC客户端可以基于REST API连接到Flink SQL Gateway。
获取源码及构建:

  1. cd ~/modules/
  2. git clone https://github.com/ververica/flink-sql-gateway.git
  3. cd ~/modules/flink-sql-gateway
  4. mvn clean install -DskipTests -Dfast
  5. vi ~/modules/flink-sql-gateway/conf/sql-gateway-defaults.yaml

配置如下:

  1. server:
  2. bind-address: 192.168.0.101
  3. address: 192.168.0.101
  4. port: 8083
  5. jvm_args: "-Xmx2018m -Xms1024m"
  6. catalogs:
  7. - name: myhive
  8. type: hive
  9. hive-conf-dir: /home/vagrant/modules/apache-hive-2.3.4-bin/conf
  10. hive-version: 2.3.4
  11. default-database: flinkhivedb

启动Flink SQL Gateway:

  1. cd ~/modules/flink-sql-gateway/build-target/bin
  2. ./sql-gateway.sh -d ~/modules/flink-sql-gateway/conf/sql-gateway-defaults.yaml -p 8083

Flink JDBC Driver

Flink JDBC Driver是一个Java库,用于通过连接到作为JDBC服务器的Flink SQL Gateway来访问和操作Apache Flink集群。

  1. cd ~/modules/
  2. git clone https://github.com/ververica/flink-jdbc-driver.git
  3. cd ~/modules/flink-jdbc-driver
  4. mvn clean install -DskipTests -Dfast

将编译后的jar包拷贝至${HIVE_HOME}/lib下:

  1. cp -rf ~/modules/flink-jdbc-driver/target/flink-jdbc-driver-*.jar ~/modules/apache-hive-2.3.4-bin/lib

启动beeline:

  1. # 提前开启metastore,hiveserver2
  2. cd ~/modules/apache-hive-2.3.4-bin
  3. # 方式1
  4. bin/beeline
  5. $beeline>> !connect jdbc:flink://192.168.0.101:8083?planner=blink
  6. # 方式2
  7. bin/beeline -u jdbc:flink://192.168.0.101:8083?planner=blink
  8. # 方式3
  9. bin/beeline -u jdbc:flink://192.168.0.101:8083?planner=blink -f run.sql

run.sql内容如下:

  1. use catalog myhive;
  2. use flinkhivedb;
  3. INSERT into flinkhivedb.code_city values(109,'test flink jdbc driver','beijin-1','2020-12-31 12:12:12');

验证:

  1. CREATE TABLE T(
  2. a INT,
  3. b VARCHAR(10)
  4. ) WITH (
  5. 'connector.type' = 'filesystem',
  6. 'connector.path' = 'file:///home/vagrant/datas/T.csv',
  7. 'format.type' = 'csv',
  8. 'format.derive-schema' = 'true'
  9. );
  10. INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello');
  11. SELECT * FROM T;

Flink集成Kafka

(1).添加依赖jar包

依赖jar包准备,将jar包直接复制到$FLINK_HOME/lib目录下:

依赖包 下载站点
flink-sql-connector-kafka_2.11-1.10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
kafka-clients-0.11.0.3.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.11.0.3/kafka-clients-0.11.0.3.jar
flink-json-1.10.0.jar ${M2_HOME}/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar

(2).创建topic

  1. cd ~/modules/kafka_2.11-0.11.0.3/
  2. bin/kafka-topics.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --create --topic user_transactions --replication-factor 1 --partitions 3

(3).Flink SQL Client

  1. vi ~/modules/flink-1.10.0/conf/sql-client-streaming.yaml

配置如下:

  1. tables: []
  2. functions: []
  3. execution:
  4. planner: blink
  5. type: streaming
  6. time-characteristic: event-time
  7. periodic-watermarks-interval: 200
  8. result-mode: table
  9. max-table-result-rows: 1000000
  10. parallelism: 1
  11. max-parallelism: 128
  12. min-idle-state-retention: 0
  13. max-idle-state-retention: 0
  14. current-catalog: default_catalog
  15. current-database: default_database
  16. restart-strategy:
  17. type: fallback
  18. deployment:
  19. response-timeout: 5000
  20. gateway-address: ""
  21. gateway-port: 0

进入Flink SQL Client终端:

  1. cd ~/modules/flink-1.10.0
  2. bin/sql-client.sh embedded -d conf/sql-client-streaming.yaml

读取kafka数据:

  1. use catalog default_catalog;
  2. DROP TABLE IF EXISTS user_transactions_source;
  3. CREATE TABLE user_transactions_source (
  4. cus_no STRING,
  5. card_no STRING,
  6. card_name STRING,
  7. card_type STRING,
  8. tranc_no STRING,
  9. txn_dte STRING,
  10. txn_tme STRING,
  11. record_tag STRING,
  12. trade_type STRING,
  13. dbt_cdt_cde STRING,
  14. amt DOUBLE,
  15. ts TIMESTAMP(3)
  16. ) WITH (
  17. 'connector.type' = 'kafka',
  18. 'connector.version' = 'universal',
  19. 'connector.topic' = 'user_transactions',
  20. 'connector.startup-mode' = 'earliest-offset',
  21. 'connector.properties.0.key' = 'zookeeper.connect',
  22. 'connector.properties.0.value' = '192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181',
  23. 'connector.properties.1.key' = 'bootstrap.servers',
  24. 'connector.properties.1.value' = '192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092',
  25. 'update-mode' = 'append',
  26. 'format.type' = 'json'
  27. );
  28. select * from user_transactions_source;

发送测试数据:

  1. # kafka生产消息
  2. echo -e '{"cus_no": "011560773924", "card_no":"110009085001000000001", "card_name": "**0001", "card_type": "02", "tranc_no": "160908504921155120200401090850", "txn_dte": "20200401", "txn_tme": "090850", "record_tag": "1", "trade_type": "102", "dbt_cdt_cde": "0", "amt": "99.88", "ts": "2020-04-01T09:08:50Z"}' | ssh vagrant@bigdata-node1 "/home/vagrant/modules/kafka_2.11-0.11.0.3/bin/kafka-console-producer.sh --broker-list 192.168.0.101:9092,192.168.0.102:9092,192.168.0.103:9092 --topic user_transactions"
  3. # kafka消费消息
  4. cd ~/modules/kafka_2.11-0.11.0.3/
  5. bin/kafka-console-consumer.sh --zookeeper 192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181 --topic user_transactions --from-beginning

补充

数据准备

  1. tee /home/vagrant/datas/flinkhivedb-stu.txt <<-'EOF'
  2. 1|polaris
  3. 2|lisi
  4. 3|wangwu
  5. 4|zhaoliu
  6. EOF

库表准备

  1. -- 开启本地模式
  2. set hive.exec.mode.local.auto=true;
  3. CREATE DATABASE IF NOT EXISTS flinkhivedb;
  4. drop table if exists flinkhivedb.stu;
  5. create table if not exists flinkhivedb.stu(id int, name string)row format delimited fields terminated by '|';
  6. -- 注意:此处文件位于Hive服务器上
  7. load data local inpath '/home/vagrant/datas/flinkhivedb-stu.txt' into table flinkhivedb.stu;