安装并启动 Flink

  • 下载 Flink 1.14.4 (使用的是 scala_2.12 版本)并将其解压至目录 flink-1.14.4
  1. wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
  2. tar -xvf flink-1.14.4-bin-scala_2.12.tgz
  • 打开配置文件
  1. vi /etc/profile
  • 添加环境变量 HADOOP_CLASSPATH(需要 Hadoop 类
  1. # HADOOP_HOME is your hadoop root directory after unpack the binary package.
  2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

如果未配置会报错

[ERROR] Could not execute SQL statement. Reason:

java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

  • 修改 flink-conf.yaml,默认是 child-first,添加 classloader.resolve-order: parent-first
  1. vim conf/flink-conf.yaml
  2. classloader.resolve-order: parent-first
  • 启动 Flink 集群
  1. cd flink-1.14.4
  2. bin/start-cluster.sh

如果未修改会报错

[ERROR] Could not execute SQL statement. Reason:**
**java.io.StreamCorruptedException: unexpected block data

启动 Flink SQL Client

在 Flink 中包含了 FileSystem SQL Connector,不需要添加额外的依赖

从文件系统中读取或者向文件系统中写入行时,需要指定相应的 format,有些 format 需要下载自己的依赖

文件系统连接器允许从本地或分布式文件系统进行读写

  1. cd lib
  2. wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-parquet_2.12/1.14.4/flink-sql-parquet_2.12-1.14.4.jar
  • 启动 Flink SQL Client
  1. ./bin/sql-client.sh
  • 创建表
  1. CREATE TABLE cluster (
  2. cluster_id STRING,
  3. name STRING,
  4. app_id STRING
  5. ) WITH (
  6. 'connector' = 'filesystem',
  7. 'path' = 'hdfs://localhost:9000/root/cluster/',
  8. 'format' = 'parquet'
  9. );
path 属性指定的是目录,而不是文件,该目录下的文件也不是可读的
  • 读取 Parquet 文件
  1. select * from cluster;

参考文档