6. File Sink
新的Data Sink API (Beta)
之前发布的Flink 版本中[1],已经支持了 source connector 工作在流批两种模式下,因此在 Flink 1.12 中,社区着重实现了统一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 what 和 how:SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);Committer 和 GlobalCommitter,封装了如何处理 committables。框架会负责 when 和 where:即在什么时间,以及在哪些机器或进程中 commit。
Flink 1.12的 FileSink 为批处理和流式处理提供了一个统一的接收器,它将分区文件写入Flink文件系统抽象所支持的文件系统。这个文件系统连接器为批处理和流式处理提供了相同的保证,它是现有流式文件接收器的一种改进。
7. FlinkSQL整合Hive
7.1 介绍
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/
https://zhuanlan.zhihu.com/p/338506408
使用Hive构建数据仓库已经成为了比较普遍的一种解决方案。目前,一些比较常见的大数据处理引擎,都无一例外兼容Hive。Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在Flink1.10版本中,标志着对 Blink的整合宣告完成,对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的Flink对于Hive的集成有所差异,接下来将以最新的Flink1.12版本为例,实现Flink集成Hive。
7.2 集成Hive的基本方式
- 持久化元数据
Flink利用 Hive 的 MetaStore 作为持久化的 Catalog,我们可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。例如,我们可以使用HiveCatalog将其 Kafka的数据源表存储在 Hive Metastore 中,这样该表的元数据信息会被持久化到Hive的MetaStore对应的元数据库中,在后续的 SQL 查询中,我们可以重复使用它们。
- 利用 Flink 来读写 Hive 的表
Flink打通了与Hive的集成,如同使用SparkSQL或者Impala操作Hive中的数据一样,我们可以使用Flink直接读写Hive中的表。
HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive表。不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。
7.3 准备工作
1.添加hadoop_classpath
vim /etc/profile
增加如下配置
export HADOOP_CLASSPATH=hadoop classpath
刷新配置
source /etc/profile
2.下载jar并上传至flink/lib目录
总共三个jar包;
hive-exec-3.1.3.jar,flink-connector-hive_2.12-1.12.0.jar,flink-sql-connector-hive-3.1.2_2.11-1.13.6.jar
3.修改hive配置
vim /export/server/hive/conf/hive-site.xml
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083</value>
</property>
4.启动hive元数据服务
nohup /export/server/apache-hive-3.1.2/bin/hive —service metastore &
7.4 SQL CLI
1.修改flinksql配置
vim /export/server/flink/conf/sql-client-defaults.yaml
增加如下配置
catalogs:
- name: myhive
type: hive
hive-conf-dir: /export/server/apache-hive-3.1.2/conf
default-database: default
2.启动flink集群
/export/server/flink-1.13.5-yarn/bin/start-cluster.sh
/export/server/flink-1.13.5-yarn/bin/stop-cluster.sh
3.启动flink-sql客户端
/export/server/flink-1.13.5-yarn/bin/sql-client.sh embedded
4.执行sql:
show catalogs;
use catalog myhive;
show tables;
select * from person;