🚕SQL操作
总结:
在基本使用中的例子用的是流来连接
在kafka的例子中使用的是connector直接连接数据库
另外group by + 聚合操作自动是撤回流
基本使用
查询未注册的表
直接使用动态表
public class Flink05_SQL_BaseUse {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 使用sql查询未注册的表Table inputTable = tableEnv.fromDataStream(waterSensorStream);Table resultTable = tableEnv.sqlQuery("select * from " + inputTable + " where id='sensor_1'");tableEnv.toAppendStream(resultTable, Row.class).print();env.execute();}}
查询已注册的表
注册后,多了一个表名
public class Flink05_SQL_BaseUse_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<WaterSensor> waterSensorStream =env.fromElements(new WaterSensor("sensor_1", 1000L, 10),new WaterSensor("sensor_1", 2000L, 20),new WaterSensor("sensor_2", 3000L, 30),new WaterSensor("sensor_1", 4000L, 40),new WaterSensor("sensor_1", 5000L, 50),new WaterSensor("sensor_2", 6000L, 60));StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 使用sql查询一个已注册的表// 1. 从流得到一个表Table inputTable = tableEnv.fromDataStream(waterSensorStream);// 2. 把注册为一个临时视图tableEnv.createTemporaryView("sensor", inputTable);// 3. 在临时视图查询数据, 并得到一个新表Table resultTable = tableEnv.sqlQuery("select * from sensor where id='sensor_1'");// 4. 显示resultTable的数据tableEnv.toAppendStream(resultTable, Row.class).print();env.execute();}}
Kafka到Kafka
使用sql从Kafka读数据, 并写入到Kafka中
如果注册了临时表,就可以使用临时表来查询。
public class Flink05_SQL_Kafka2Kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1. 注册SourceTable: source_sensortableEnv.executeSql("create table source_sensor (id string, ts bigint, vc int) with("+ "'connector' = 'kafka',"+ "'topic' = 'topic_source_sensor',"+ "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',"+ "'properties.group.id' = 'atguigu',"+ "'scan.startup.mode' = 'latest-offset',"+ "'format' = 'json'"+ ")");//中间表表好像找不到注册表的方法// Table table = ten.sqlQuery("select * from sensor")// 2. 注册SinkTable: sink_sensortableEnv.executeSql("create table sink_sensor(id string, ts bigint, vc int) with("+ "'connector' = 'kafka',"+ "'topic' = 'topic_sink_sensor',"+ "'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',"+ "'format' = 'json'"+ ")");// 3. 从SourceTable 查询数据, 并写入到 SinkTable// 不注册://tenv.executeSql("insert into s2 select * from + table)tableEnv.executeSql("insert into sink_sensor select * from source_sensor where id='sensor_1'");}}
