目的:读csv,table/sql过滤聚合输出
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-table-planner_2.12</artifactId>
  14. <version>1.10.1</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  19. <version>1.10.1</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-csv</artifactId>
  24. <version>1.10.1</version>
  25. </dependency>

代码演示

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  4. tableEnv.connect(new FileSystem().path("绝对路径"))
  5. .withFormat(new Csv())
  6. .withSchema(new Schema()
  7. .field("id", DataTypes.INT())
  8. .field("name", DataTypes.STRING())
  9. )
  10. .createTemporaryTable("inputTable");
  11. Table table = tableEnv.from("inputTable").select("id,name").filter("id ===1");//table过滤
  12. Table table1 = tableEnv.from("inputTable").groupBy("id").select("id,id.count as count"); //table聚合
  13. Table sql =tableEnv.sqlQuery("select id ,name from inputTable where id=2"); //sql过滤
  14. Table sql1 = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");// sql聚合
  15. tableEnv.toAppendStream(sql, Row.class).print("sql");
  16. tableEnv.toRetractStream(sql1, Row.class).print("sql1");//撤回流输出
  17. tableEnv.toAppendStream(table, Row.class).print("table");
  18. tableEnv.toRetractStream(table1, Row.class).print("table1");//撤回流输出
  19. env.execute();

sql过滤聚合.rar

结果

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. table> 1,wang
  5. table> 1,ww
  6. table> 1,ee
  7. sql1> (true,1,1)
  8. sql1> (false,1,1)
  9. sql1> (true,1,2)
  10. sql1> (false,1,2)
  11. sql1> (true,1,3)
  12. sql1> (true,2,1)
  13. table1> (true,1,1)
  14. table1> (false,1,1)
  15. table1> (true,1,2)
  16. table1> (false,1,2)
  17. table1> (true,1,3)
  18. table1> (true,2,1)
  19. sql> 2,san