目的:读csv,table/sql过滤聚合输出
环境:jdk8 工具:idea2019
创建maven项目
引入jar包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version></dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);tableEnv.connect(new FileSystem().path("绝对路径")).withFormat(new Csv()).withSchema(new Schema().field("id", DataTypes.INT()).field("name", DataTypes.STRING())).createTemporaryTable("inputTable");Table table = tableEnv.from("inputTable").select("id,name").filter("id ===1");//table过滤Table table1 = tableEnv.from("inputTable").groupBy("id").select("id,id.count as count"); //table聚合Table sql =tableEnv.sqlQuery("select id ,name from inputTable where id=2"); //sql过滤Table sql1 = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");// sql聚合tableEnv.toAppendStream(sql, Row.class).print("sql");tableEnv.toRetractStream(sql1, Row.class).print("sql1");//撤回流输出tableEnv.toAppendStream(table, Row.class).print("table");tableEnv.toRetractStream(table1, Row.class).print("table1");//撤回流输出env.execute();
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.table> 1,wangtable> 1,wwtable> 1,eesql1> (true,1,1)sql1> (false,1,1)sql1> (true,1,2)sql1> (false,1,2)sql1> (true,1,3)sql1> (true,2,1)table1> (true,1,1)table1> (false,1,1)table1> (true,1,2)table1> (false,1,2)table1> (true,1,3)table1> (true,2,1)sql> 2,san
