目的:读csv,table/sql过滤聚合输出
环境:jdk8 工具:idea2019
创建maven项目
引入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect(new FileSystem().path("绝对路径"))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
)
.createTemporaryTable("inputTable");
Table table = tableEnv.from("inputTable").select("id,name").filter("id ===1");//table过滤
Table table1 = tableEnv.from("inputTable").groupBy("id").select("id,id.count as count"); //table聚合
Table sql =tableEnv.sqlQuery("select id ,name from inputTable where id=2"); //sql过滤
Table sql1 = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");// sql聚合
tableEnv.toAppendStream(sql, Row.class).print("sql");
tableEnv.toRetractStream(sql1, Row.class).print("sql1");//撤回流输出
tableEnv.toAppendStream(table, Row.class).print("table");
tableEnv.toRetractStream(table1, Row.class).print("table1");//撤回流输出
env.execute();
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
table> 1,wang
table> 1,ww
table> 1,ee
sql1> (true,1,1)
sql1> (false,1,1)
sql1> (true,1,2)
sql1> (false,1,2)
sql1> (true,1,3)
sql1> (true,2,1)
table1> (true,1,1)
table1> (false,1,1)
table1> (true,1,2)
table1> (false,1,2)
table1> (true,1,3)
table1> (true,2,1)
sql> 2,san