安装并启动 Flink
下载 Flink 1.14.4 (使用的是 scala_2.12 版本)并将其解压至目录 flink-1.14.4
wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
tar -xvf flink-1.14.4-bin-scala_2.12.tgz
打开配置文件
vi /etc/profile
添加环境变量 HADOOP_CLASSPATH(需要 Hadoop 类 )
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
如果未配置会报错 [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
- 修改 flink-conf.yaml,默认是 child-first,添加
classloader.resolve-order: parent-first
```bash vim conf/flink-conf.yaml
classloader.resolve-order: parent-first
- 启动 Flink 集群
```json
cd flink-1.14.4
bin/start-cluster.sh
如果未修改会报错 [ERROR] Could not execute SQL statement. Reason:java.io.StreamCorruptedException: unexpected block data**
启动 Flink SQL Client
在 Flink 中包含了 FileSystem SQL Connector,不需要添加额外的依赖 从文件系统中读取或者向文件系统中写入行时,需要指定相应的 format,有些 format 需要下载自己的依赖 文件系统连接器允许从本地或分布式文件系统进行读写
- 下载 flink-sql-parquet 依赖包(flink-1.14.4/lib 目录下) ```scala cd lib
- 启动 Flink SQL Client
```scala
./bin/sql-client.sh
- 创建表
CREATE TABLE cluster (
cluster_id STRING,
name STRING,
app_id STRING
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://localhost:9000/root/cluster/',
'format' = 'parquet'
);
path 属性指定的是目录,而不是文件,该目录下的文件也不是可读的
- 读取 Parquet 文件
select * from cluster;
参考文档