环境版本

Flink: 1.13.3
Scala: 2.11
Java: 1.8.0_301
Hudi: 0.10.0-SNAPSHOT

环境搭建

hudi-demo
├── ckps
├── flink-1.13.3
├── hadoop-2.9.2
└── t2

这是我本地最终搭建好的测试环境目录分布,其中

  • ckps:是 flink checkpoint 的目录
  • flink-1.13.3:是 flink 发行版解压后的根目录
  • hadoop-2.9.2:hadoop 发行版解压后的根目录,这里我们选取 hadoop-2.9.2
  • t2:本地的 hudi 数据目录

这里我们以 $HUDI_DEMO 指代 hudi-demo 目录的绝对路径。

Flink 环境

Flink 环境,从 Flink 的官网 https://flink.apache.org/downloads.html#apache-flink-1133 下载最新的 flink 发行版到本地并解压到 $HUDI_DEMO

这里我们以 $FLINK_HOME 指代 flink 发行版解压后 根目录 的绝对路径。

修改 $FLINK_HOME/workers 文件,将默认的 localhost 添加到 8 行,这里的行数代表本地 localcluster 启动的 TaskManager 数,一共是 8 个 TaskManager(每个 TaskManager 默认 1 个 slot)。调整这一步主要是防止测试 task 启动较多 slot 不够的情况。

workers 文件:

  1. localhost
  2. localhost
  3. localhost
  4. localhost
  5. localhost
  6. localhost
  7. localhost
  8. localhost

修改 flink-conf.yaml 添加 checkpoint 相关的配置:

# 开启 checkpoint 周期为 30 秒
execution.checkpointing.interval: 30000
# 指定本地 checkpoint 目录, ${HUDI_DEMO} 变量换成对应的绝对路径
state.checkpoints.dir: ${HUDI_DEMO}/ckps

Hadoop 环境

从 Hadoop 官网下载发行包:https://archive.apache.org/dist/hadoop/common/hadoop-2.9.2/ 并解压到 $HUDI_DEMO 文件夹,通过命令

# ${HUDI_DEMO} 请替换为对应的绝对路径
HADOOP_CLASSPATH=`${HUDI_DEMO}/hadoop-2.9.2/bin/hadoop classpath`
export HADOOP_CLASSPATH

指定 HADOOP_CLASSPATH,flink 是不打任何 hadoop 包的,但是自身依赖了 hadoop 的接口,这里设置变量 HADOOP_CLASSPATH 为后面的 sqlclient 测试用。

Hudi 环境

Hudi 的 master 代码需要自己打包,从 github https://github.com/apache/hudi 下载源码 ,进入根目录执行命令:

mvn clean install -DskipTests

完成后进入 packaging/hudi-flink-bundle 目录,执行命令:

mvn clean install -DskipTests -Drat.skip=true -Pflink-bundle-shade-hive2

这里指定 hive2 的 profile,完成后会得到一个 hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar 包,我们将包拷贝到 $FLINK_HOME/lib 目录下即可

启动环境

$FLINK_HOME/bin 目录下执行脚本

./start-cluster.sh

启动测试集群,启动完成后,可在 URL http://localhost:8081/ 查看 Flink UI。

集群启动完成再启动 sql client:

./sql-client.sh

执行测试

这里我们创建一个 datagen 的 source 和 hudi sink,启动 sql 流任务:

CREATE TABLE sourceT (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20)
)
with (
  'connector' = 'hudi',
  'path' = '$HUDI_DEMO/t2', -- $HUDI_DEMO 替换成的绝对路径
  'table.type' = 'MERGE_ON_READ',
  'write.bucket_assign.tasks' = '2',
  'write.tasks' = '2',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://ip:9083' -- ip 替换成 HMS 的地址
);

insert into t2 select * from sourceT;

datagen source 每秒钟生成一条消息,通过 MOR 写入 hudi 表,bucket assign 并发为 2,写 task 并发为 2,压缩策略为 每 5 个 commits/checkpoints 执行一次压缩(默认压缩策略)。

通过 Flink UI 可以查看作业运行状态。

问题排查

作业无法启动

作业无法启动,或者作业一启动就把集群 JM 搞挂了,可以在目录 $FLINK_HOME/log 执行命令

tail -f *

查看执行日志,寻找具体的错误堆栈,如果日志刷的较多,可以单独查看文件

$FLINK_HOME/log/flink-xxx-standalonesession-0-chenyuzhaodeMacBook-Pro.local.log