安装并启动 Flink

  • 下载 Flink 1.14.4 (使用的是 scala_2.12 版本)并将其解压至目录 flink-1.14.4

    1. wget https://dlcdn.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
    2. tar -xvf flink-1.14.4-bin-scala_2.12.tgz
  • 打开配置文件

    1. vi /etc/profile
  • 添加环境变量 HADOOP_CLASSPATH(需要 Hadoop 类 )

    1. # HADOOP_HOME is your hadoop root directory after unpack the binary package.
    2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

    如果未配置会报错 [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

  • 修改 flink-conf.yaml,默认是 child-first,添加 classloader.resolve-order: parent-first ```bash vim conf/flink-conf.yaml

classloader.resolve-order: parent-first

  1. - 启动 Flink 集群
  2. ```json
  3. cd flink-1.14.4
  4. bin/start-cluster.sh

如果未修改会报错 [ERROR] Could not execute SQL statement. Reason:java.io.StreamCorruptedException: unexpected block data**

启动 Flink SQL Client

在 Flink 中包含了 FileSystem SQL Connector,不需要添加额外的依赖 从文件系统中读取或者向文件系统中写入行时,需要指定相应的 format,有些 format 需要下载自己的依赖 文件系统连接器允许从本地或分布式文件系统进行读写

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-parquet_2.12/1.14.4/flink-sql-parquet_2.12-1.14.4.jar

  1. - 启动 Flink SQL Client
  2. ```scala
  3. ./bin/sql-client.sh
  • 创建表
    1. CREATE TABLE cluster (
    2. cluster_id STRING,
    3. name STRING,
    4. app_id STRING
    5. ) WITH (
    6. 'connector' = 'filesystem',
    7. 'path' = 'hdfs://localhost:9000/root/cluster/',
    8. 'format' = 'parquet'
    9. );

    path 属性指定的是目录,而不是文件,该目录下的文件也不是可读的

  • 读取 Parquet 文件
    1. select * from cluster;

参考文档