导入需要的依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  10. <version>${flink.version}</version>
  11. <scope>provided</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-csv</artifactId>
  16. <version>${flink.version}</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-json</artifactId>
  21. <version>${flink.version}</version>
  22. </dependency>

使用总结

Table API总结

  1. Table resultTable = table
  2. .where($("vc").isGreaterOrEqual(20))
  3. .groupBy($("id"))
  4. .aggregate($("vc").sum().as("vc_sum"))
  5. .select($("id"), $("vc_sum"));

SQL 总结

  1. tEnv.executeSql("create table sensor(" +
  2. "id string," +
  3. "ts bigint," +
  4. "vc int, " +
  5. "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
  6. "watermark for t as t - interval '5' second)" +
  7. "with("
  8. + "'connector' = 'filesystem',"
  9. + "'path' = 'input/sensor.txt',"
  10. + "'format' = 'csv'"
  11. + ")");

以下是流操作

TableAPI(动态表) 与 DateStream之间的转换

流到表的转换

  1. public class Flink01_TableApi_BasicUse {
  2. public static void main(String[] args) {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. DataStreamSource<WaterSensor> waterSensorStream =
  6. env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
  7. new WaterSensor("sensor_1", 2000L, 20),
  8. new WaterSensor("sensor_2", 3000L, 30),
  9. new WaterSensor("sensor_1", 4000L, 40),
  10. new WaterSensor("sensor_1", 5000L, 50),
  11. new WaterSensor("sensor_2", 6000L, 60));
  12. // 1. 创建表的执行环境
  13. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  14. // 2. 创建表: 将流转换成动态表. 表的字段名从pojo的属性名自动抽取
  15. Table table = tableEnv.fromDataStream(waterSensorStream);
  16. // 3. 对动态表进行查询
  17. Table resultTable = table
  18. .where($("id").isEqual("sensor_1"))
  19. .select($("id"), $("ts"), $("vc"));
  20. // 4. 把动态表转换成流
  21. // 有了Row.class就可以起别名了
  22. DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
  23. resultStream.print();
  24. try {
  25. env.execute();
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }

聚合操作

  1. // 3. 对动态表进行查询
  2. Table resultTable = table
  3. .where($("vc").isGreaterOrEqual(20))
  4. .groupBy($("id"))
  5. .aggregate($("vc").sum().as("vc_sum"))
  6. .select($("id"), $("vc_sum"));
  7. // 4. 把动态表转换成流 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
  8. DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);

表到流的转换

动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化:

Append-only 追加流

调用:**tenv.toAppendStream**_**(**_**t1, Row.class**_**)**_**;**
使用条件:只有insert操作使用,加入你使用了分组,本来流中来了1条sensor_1,1 ,后面再来一条sensor_1,1,此时再分组sum聚合后,数据更新了,则不能使用这种模式

Retract 流,撤回流

调用:**tenv.toRetractStream**_**(**_**t1,Row.class**_**)**_
使用条件:包含insert、delete、update的操作,一般产生这些操作的算子是聚合操作算子
原理:
retract 流包含2种类型的 message:
add message+ retract message
将INSERT 操作编码为 add message、
将 DELETE 操作编码为 retract message、
将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,
效果:因为有了先把原先重复数据删除再添加的操作,会让数据不出现重复的效果

看图:横线条要对齐看
image.png

Upsert 流

调用:**在将动态表转换为 DataStream 时,只支持 append 流和 retract 流**
upsert 流包含两种类型的 message:
upsert messagesdelete messages
转换为 upsert 流的动态表需要(可能是组合的)唯一键。
将 INSERT 和 UPDATE 操作编码为 upsert message,
将 DELETE 操作编码为 delete message ,
将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。

与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高
image.png
效果:
数据会重复,最后的数据才是全局的数据

以上属于流操作


TableAPI(动态表) 与 数据源(conncect)之间的转换

已弃用,在下个版本里使用sql DDL重构,更加丰富

image.png
写入source

File source

  1. // 2. 创建表
  2. // 2.1 表的元数据信息
  3. Schema schema = new Schema()
  4. .field("id", DataTypes.STRING())
  5. .field("ts", DataTypes.BIGINT())
  6. .field("vc", DataTypes.INT());
  7. // 2.2 连接文件, 并创建一个临时表, 其实就是一个动态表
  8. tableEnv.connect(new FileSystem().path("input/sensor.txt"))
  9. .withFormat(new Csv().fieldDelimiter(',').lineDelimiter("\n"))
  10. .withSchema(schema)
  11. .createTemporaryTable("sensor");
  12. // 3. 做成表对象, 然后对动态表进行查询
  13. Table sensorTable = tableEnv.from("sensor");
  14. Table resultTable = sensorTable
  15. .groupBy($("id"))
  16. .select($("id"), $("id").count().as("cnt"));
  17. // 4. 把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
  18. DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
  19. resultStream.print();

Kafka Source

  1. // 2. 创建表
  2. // 2.1 表的元数据信息
  3. Schema schema = new Schema()
  4. .field("id", DataTypes.STRING())
  5. .field("ts", DataTypes.BIGINT())
  6. .field("vc", DataTypes.INT());
  7. // 2.2 连接文件, 并创建一个临时表, 其实就是一个动态表
  8. tableEnv
  9. .connect(new Kafka()
  10. .version("universal")
  11. .topic("sensor")
  12. .startFromLatest()// 指定offset
  13. .property("group.id", "bigdata")
  14. .property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092"))
  15. .withFormat(new Json())
  16. .withSchema(schema)
  17. .createTemporaryTable("sensor");
  18. // 3. 对动态表进行查询
  19. Table sensorTable = tableEnv.from("sensor");
  20. Table resultTable = sensorTable
  21. .groupBy($("id"))
  22. .select($("id"), $("id").count().as("cnt"));
  23. // 4. 把动态表转换成流. 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
  24. DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class);
  25. resultStream.print();

写出sink:

File Sink

  1. // 1. 创建表的执行环境
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  3. Table sensorTable = tableEnv.fromDataStream(waterSensorStream);
  4. Table resultTable = sensorTable
  5. .where($("id").isEqual("sensor_1") )
  6. .select($("id"), $("ts"), $("vc"));
  7. // 创建输出表
  8. Schema schema = new Schema()
  9. .field("id", DataTypes.STRING())
  10. .field("ts", DataTypes.BIGINT())
  11. .field("vc", DataTypes.INT());
  12. tableEnv
  13. .connect(new FileSystem().path("output/sensor_id.txt"))
  14. .withFormat(new Csv().fieldDelimiter('|'))
  15. .withSchema(schema)
  16. .createTemporaryTable("sensor");
  17. // 把数据写入到输出表中
  18. resultTable.executeInsert("sensor");

Kafka Sink

  1. // 1. 创建表的执行环境
  2. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  3. Table sensorTable = tableEnv.fromDataStream(waterSensorStream);
  4. Table resultTable = sensorTable
  5. .where($("id").isEqual("sensor_1") )
  6. .select($("id"), $("ts"), $("vc"));
  7. // 创建输出表
  8. Schema schema = new Schema()
  9. .field("id", DataTypes.STRING())
  10. .field("ts", DataTypes.BIGINT())
  11. .field("vc", DataTypes.INT());
  12. tableEnv
  13. .connect(new Kafka()
  14. .version("universal")
  15. .topic("sink_sensor")
  16. .sinkPartitionerRoundRobin()
  17. .property("bootstrap.servers", "hadoop162:9092,hadoop163:9092,hadoop164:9092"))
  18. .withFormat(new Json())
  19. .withSchema(schema)
  20. .createTemporaryTable("sensor");
  21. // 把数据写入到输出表中
  22. resultTable.executeInsert("sensor");


其他connector用法

参考官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connect.html