🚕SQL操作
总结:
在基本使用中的例子用的是流来连接
在kafka的例子中使用的是connector直接连接数据库
另外group by + 聚合操作自动是撤回流
基本使用
查询未注册的表
直接使用动态表
public class Flink05_SQL_BaseUse {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> waterSensorStream =
env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 使用sql查询未注册的表
Table inputTable = tableEnv.fromDataStream(waterSensorStream);
Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id='sensor_1'");
tableEnv.toAppendStream(resultTable, Row.class).print();
env.execute();
}
}
查询已注册的表
注册后,多了一个表名
public class Flink05_SQL_BaseUse_2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<WaterSensor> waterSensorStream =
env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 使用sql查询一个已注册的表
// 1. 从流得到一个表
Table inputTable = tableEnv.fromDataStream(waterSensorStream);
// 2. 把注册为一个临时视图
tableEnv.createTemporaryView("sensor", inputTable);
// 3. 在临时视图查询数据, 并得到一个新表
Table resultTable = tableEnv.sqlQuery("select * from sensor where id='sensor_1'");
// 4. 显示resultTable的数据
tableEnv.toAppendStream(resultTable, Row.class).print();
env.execute();
}
}
Kafka到Kafka
使用sql从Kafka读数据, 并写入到Kafka中
如果注册了临时表,就可以使用临时表来查询。
public class Flink05_SQL_Kafka2Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1. 注册SourceTable: source_sensor
tableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_source_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',"
+ "'properties.group.id' = 'atguigu',"
+ "'scan.startup.mode' = 'latest-offset',"
+ "'format' = 'json'"
+ ")");
//中间表表好像找不到注册表的方法
// Table table = ten.sqlQuery("select * from sensor")
// 2. 注册SinkTable: sink_sensor
tableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("
+ "'connector' = 'kafka',"
+ "'topic' = 'topic_sink_sensor',"
+ "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',"
+ "'format' = 'json'"
+ ")");
// 3. 从SourceTable 查询数据, 并写入到 SinkTable
// 不注册:
//tenv.executeSql("insert into s2 select * from + table)
tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");
}
}