TableEnvironment

TableEnvironment 是flink中集成TableAPI和SQL的核心概念,它可以
1、注册catalog、在catalog中注册表

  • catalog可以理解为元数据、里面有database、然后是table
  • Table类型 需要在catalog注册之后 才能sql直接使用
  • tableEnv.createTemporaryView("NewTable", newTable);

2、DataStream与Table互相转换
3、执行sql
4、注册UDF

日常简单获取tableEnv

  1. //这个默认是什么?1.11及之后默认是blink版本 之前是老版本
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

老版本planner获取tableEnv

  1. // 基于老版本planner的流处理
  2. EnvironmentSettings oldStreamSetting = EnvironmentSettings
  3. .newInstance()
  4. .useOldPlanner()
  5. .inStreamingMode()
  6. .build();
  7. StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSetting);
  8. // 基于老版本planner的批处理
  9. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  10. BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

新版本blink-planner获取tableEnv

  1. //基于Blink的流处理
  2. EnvironmentSettings blinkStreamSetting = EnvironmentSettings.newInstance()
  3. .useBlinkPlanner()
  4. .inStreamingMode()
  5. .build();
  6. StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSetting);
  7. //基于blink的批处理
  8. EnvironmentSettings blinkBatchSetting = EnvironmentSettings.newInstance()
  9. .useBlinkPlanner()
  10. .inBatchMode()
  11. .build();
  12. TableEnvironment blinkBatchTableEnv1 = TableEnvironment.create(blinkBatchSetting);

输入

sql可以使用的表

  • 表结构:CSV、JSON、parquet 定义了底层存储的方式、表的序列化方式。 需要引入额外pom ``java // 方法一:连接外部系统 tableEnv.executeSql("CREATE TEMPORARY TABLE inputTable ... WITH ( 'connector' = ... )"); // kafka CREATE TABLE KafkaTable (userSTRING,urlSTRING,ts` TIMESTAMP(3) METADATA FROM ‘timestamp’ —【METADATA FROM】 使用Kafka的元数据字段timestamp生成一个新字段ts ) WITH ( ‘connector’ = ‘kafka’, ‘topic’ = ‘events’, ‘properties.bootstrap.servers’ = ‘localhost:9092’, ‘properties.group.id’ = ‘testGroup’, ‘scan.startup.mode’ = ‘earliest-offset’, ‘format’ = ‘csv’) // 文件 CREATE TABLE MyTable ( column_name1 INT, column_name2 STRING, … part_name1 INT, part_name2 STRING ) PARTITIONED BY (part_name1, part_name2 ) WITH ( ‘connector’ = ‘filesystem’, — 连接器类型 ‘path’ = ‘…’, — 文件路径 ‘format’ = ‘…’) — 文件格式

// 方法二:从Table对象、dataStream:createTemporaryView 第二个参数可是table 可是dataStream tableEnv.createTemporaryView(“NewTable”, newTable); tableEnv.createTemporaryView(“NewTable”, dataStream,$(“timestamp”).as(“ts”), $(“url”));

// 方法三:从Table对象:直接以字符串拼接添加到SQL中,在解析时会自动注册一个同名的虚拟表到环境中,这样就省略了创建虚拟视图的步骤 Table clickTable = tableEnvironment.sqlQuery(“select url, user from “ + eventTable);

  1. <a name="WVTU6"></a>
  2. #### Table对象
  3. ```java
  4. // 方法一:从DataStream:提取Event中的属性 作为表的字段
  5. Table eventTable = tableEnv.fromDataStream(eventStream);
  6. Table eventTable2 = tableEnv.fromDataStream(
  7. eventStream,
  8. $("timestamp").as("ts"),
  9. $("url"));
  10. // 方法二:从sql可以使用的表
  11. Table eventTable = tableEnv.from("NewTable");

转换

使用sql

  1. Table table1 = tableEnv.sqlQuery("SELECT ... FROM inputTable... ");

使用table-api

  1. Table maryClickTable = eventTable
  2. .where($("user").isEqual("Alice"))
  3. .select($("url"), $("user"));

Table >>> dataStream

  1. // 仅插入流:select简单查询之后得到的table
  2. tableEnv.toDataStream(aliceVisitTable).print();
  3. >>>+I[./home, Alice] +I表示【插入】
  4. // 更新日志流:group by 之后得到的table
  5. tableEnv.toChangelogStream(groupTable).print();
  6. >>>+I[Bob, 1] +I表示【插入】
  7. -U[Alice, 1] -U表示【更新前】
  8. +U[Alice, 2] +U表示【更新后】

输出

  1. // 方法一:executeInsert + 连接外部系统
  2. tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
  3. TableResult tableResult = table1.executeInsert("outputTable");
  4. // 连接到控制台 打印输出
  5. CREATE TABLE outputTable (
  6. user STRING,
  7. cnt BIGINT
  8. WITH ('connector' = 'print');
  9. // 更新流输出到kafka
  10. CREATE TABLE pageviews_per_region (
  11. user_region STRING,
  12. pv BIGINT,
  13. uv BIGINT,
  14. PRIMARY KEY (user_region) NOT ENFORCED
  15. ) WITH (
  16. 'connector' = 'upsert-kafka',
  17. --更新流 以主键形式进行唯一标识 删除就是valuenull
  18. --kafka并不支持数据修改
  19. 'topic' = 'pageviews_per_region',
  20. 'properties.bootstrap.servers' = '...',
  21. 'key.format' = 'avro',
  22. 'value.format' = 'avro');
  23. INSERT INTO pageviews_per_region
  24. SELECT
  25. user_region,
  26. COUNT(*),
  27. COUNT(DISTINCT user_id)
  28. FROM pageviews
  29. GROUP BY user_region;
  30. // 更新流:输出到mysql 需定义主键
  31. CREATE TABLE output (
  32. id BIGINT,
  33. name STRING,
  34. age INT,
  35. status BOOLEAN,
  36. PRIMARY KEY (id) NOT ENFORCED
  37. ) WITH (
  38. 'connector' = 'jdbc',
  39. 'url' = 'jdbc:mysql://localhost:3306/mydatabase',
  40. 'table-name' = 'users');--mysql真正表名是users output只是flink-catalog里的名字
  41. // 更新流:输出到es
  42. CREATE TABLE MyTable (
  43. user_id STRING,
  44. user_name STRING
  45. uv BIGINT,
  46. pv BIGINT,
  47. PRIMARY KEY (user_id) NOT ENFORCED
  48. ) WITH (
  49. 'connector' = 'elasticsearch-7',
  50. 'hosts' = 'http://localhost:9200',
  51. 'index' = 'users');
  52. // 更新流:输出到hbase 只支持hbase1.4 2.2
  53. CREATE TABLE MyTable (
  54. rowkey INT,
  55. family1 ROW<q1 INT>,
  56. family2 ROW<q2 STRING, q3 BIGINT>,
  57. family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
  58. PRIMARY KEY (rowkey) NOT ENFORCED
  59. ) WITH (
  60. 'connector' = 'hbase-1.4',
  61. 'table-name' = 'mytable',
  62. 'zookeeper.quorum' = 'localhost:2181');
  63. INSERT INTO MyTable
  64. SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
  65. // 方法二:executeSql + 连接外部系统
  66. tableEnv.executeSql("insert into outputTable select ** from table1 where user = 'Alice'");

动态表、持续查询

image.png

  • 与数据库的静态数据不同,flink是动态数据的查询。叫做动态表
  • 查询一个动态表叫做持续查询,并生成另一张动态表

    动态表的三种编码方式

  • 名词解释:通过发送编码消息的方式告诉外部系统要执行的操作

  • 更新流跟撤回流的主要区别在于,更新(update)操作由于有 key 的存在, 只需要用单条消息编码就可以,因此效率更高。

    append-only追加流

    只做insert操作

  • 本流的工作模式:只有增

    retract撤回流

    有添加add、撤回retract操作。

  • 本流的工作模式:增删改:增add、删retract、改retract+add

    update更新流

    有更新upsert、删除delete操作

  • 本流的工作模式:增删改:增upsert、删delete、改upsert

  • 需要动态表有主键、唯一key