🚕SQL操作

总结:

在基本使用中的例子用的是流来连接
在kafka的例子中使用的是connector直接连接数据库

另外group by + 聚合操作自动是撤回流

基本使用

查询未注册的表

直接使用动态表

  1. public class Flink05_SQL_BaseUse {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. DataStreamSource<WaterSensor> waterSensorStream =
  6. env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
  7. new WaterSensor("sensor_1", 2000L, 20),
  8. new WaterSensor("sensor_2", 3000L, 30),
  9. new WaterSensor("sensor_1", 4000L, 40),
  10. new WaterSensor("sensor_1", 5000L, 50),
  11. new WaterSensor("sensor_2", 6000L, 60));
  12. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  13. // 使用sql查询未注册的表
  14. Table inputTable = tableEnv.fromDataStream(waterSensorStream);
  15. Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id='sensor_1'");
  16. tableEnv.toAppendStream(resultTable, Row.class).print();
  17. env.execute();
  18. }
  19. }

查询已注册的表

注册后,多了一个表名

  1. public class Flink05_SQL_BaseUse_2 {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. DataStreamSource<WaterSensor> waterSensorStream =
  6. env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
  7. new WaterSensor("sensor_1", 2000L, 20),
  8. new WaterSensor("sensor_2", 3000L, 30),
  9. new WaterSensor("sensor_1", 4000L, 40),
  10. new WaterSensor("sensor_1", 5000L, 50),
  11. new WaterSensor("sensor_2", 6000L, 60));
  12. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  13. // 使用sql查询一个已注册的表
  14. // 1. 从流得到一个表
  15. Table inputTable = tableEnv.fromDataStream(waterSensorStream);
  16. // 2. 把注册为一个临时视图
  17. tableEnv.createTemporaryView("sensor", inputTable);
  18. // 3. 在临时视图查询数据, 并得到一个新表
  19. Table resultTable = tableEnv.sqlQuery("select * from sensor where id='sensor_1'");
  20. // 4. 显示resultTable的数据
  21. tableEnv.toAppendStream(resultTable, Row.class).print();
  22. env.execute();
  23. }
  24. }

_

Kafka到Kafka

使用sql从Kafka读数据, 并写入到Kafka
如果注册了临时表,就可以使用临时表来查询。

  1. public class Flink05_SQL_Kafka2Kafka {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  6. // 1. 注册SourceTable: source_sensor
  7. tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("
  8. + "'connector' = 'kafka',"
  9. + "'topic' = 'topic_source_sensor',"
  10. + "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',"
  11. + "'properties.group.id' = 'atguigu',"
  12. + "'scan.startup.mode' = 'latest-offset',"
  13. + "'format' = 'json'"
  14. + ")");
  15. //中间表表好像找不到注册表的方法
  16. // Table table = ten.sqlQuery("select * from sensor")
  17. // 2. 注册SinkTable: sink_sensor
  18. tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("
  19. + "'connector' = 'kafka',"
  20. + "'topic' = 'topic_sink_sensor',"
  21. + "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',"
  22. + "'format' = 'json'"
  23. + ")");
  24. // 3. 从SourceTable 查询数据, 并写入到 SinkTable
  25. // 不注册:
  26. //tenv.executeSql("insert into s2 select * from + table)
  27. tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");
  28. }
  29. }