目的:读csv,table/sql输出到csv
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-table-planner_2.12</artifactId>
  14. <version>1.10.1</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  19. <version>1.10.1</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-csv</artifactId>
  24. <version>1.10.1</version>
  25. </dependency>

代码演示

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  4. tableEnv.connect(new FileSystem().path("C:\\Users\\Luo\\Desktop\\Table\\src\\main\\resources\\test.csv"))
  5. .withFormat(new Csv())
  6. .withSchema(new Schema()
  7. .field("id", DataTypes.INT())
  8. .field("name", DataTypes.STRING())
  9. )
  10. .createTemporaryTable("inputTable");
  11. Table table = tableEnv.from("inputTable").select("id,name").filter("id ===1");
  12. Table table1 = tableEnv.from("inputTable").groupBy("id").select("id,id.count as count");
  13. Table sql = tableEnv.sqlQuery("select id ,name from inputTable where id=2");
  14. Table sql1 = tableEnv.sqlQuery("select id, count(id) as cnt from inputTable group by id");
  15. tableEnv.connect(new FileSystem().path("C:\\Users\\Luo\\Desktop\\Table\\src\\main\\resources\\out.csv"))
  16. .withFormat(new Csv())
  17. .withSchema(new Schema()
  18. .field("id", DataTypes.INT())
  19. .field("name", DataTypes.STRING())
  20. )
  21. .createTemporaryTable("outTable");
  22. sql.insertInto("outTable");
  23. env.execute();

输出到文件.rar

结果

  1. resouces 文件夹下产生out.csv文件

注意

  1. 初始是没有out.csv文件的。只能插入,不能撤回和更新插入,需要支持这些模式的场景