安装并启动 Flink
- 下载 Flink 1.14.4 (使用的是 scala_2.12 版本)并将其解压至目录 flink-1.14.4
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
tar -xvf flink-1.14.4-bin-scala_2.12.tgz
- 打开配置文件
vi /etc/profile
- 添加环境变量 HADOOP_CLASSPATH(需要 Hadoop 类 )
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
如果未配置会报错
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
- 修改 flink-conf.yaml,默认是 child-first,添加
classloader.resolve-order: parent-first
vim conf/flink-conf.yaml
classloader.resolve-order: parent-first
- 启动 Flink 集群
cd flink-1.14.4
bin/start-cluster.sh
如果未修改会报错
[ERROR] Could not execute SQL statement. Reason:**
**java.io.StreamCorruptedException: unexpected block data
启动 Flink SQL Client
在 Flink 中包含了 FileSystem SQL Connector,不需要添加额外的依赖
从文件系统中读取或者向文件系统中写入行时,需要指定相应的 format,有些 format 需要下载自己的依赖
文件系统连接器允许从本地或分布式文件系统进行读写
- 下载 flink-sql-parquet 依赖包(flink-1.14.4/lib 目录下)
cd lib
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-parquet_2.12/1.14.4/flink-sql-parquet_2.12-1.14.4.jar
- 启动 Flink SQL Client
./bin/sql-client.sh
- 创建表
CREATE TABLE cluster (
cluster_id STRING,
name STRING,
app_id STRING
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://localhost:9000/root/cluster/',
'format' = 'parquet'
);
path 属性指定的是目录,而不是文件,该目录下的文件也不是可读的
- 读取 Parquet 文件
select * from cluster;
参考文档