目的:读csv,table进行直接输出
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-table-planner_2.12</artifactId>
  14. <version>1.10.1</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  19. <version>1.10.1</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-csv</artifactId>
  24. <version>1.10.1</version>
  25. </dependency>

代码演示

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //创建表环境
  4. tableEnv.connect(new FileSystem().path("绝对路径"))
  5. .withFormat(new Csv()) //Csv格式化工具
  6. .withSchema(new Schema()
  7. .field("id", DataTypes.INT()) //定义字段
  8. .field("name", DataTypes.STRING())
  9. )
  10. .createTemporaryTable("inputTable"); //注册表,表名为inputTablev
  11. Table table = tableEnv.from("inputTable");
  12. table.printSchema(); //打印表结构
  13. tableEnv.toAppendStream(table, Row.class).print();
  14. env.execute();

table自身简单示例.rar

结果

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. root
  5. |-- id: INT
  6. |-- name: STRING
  7. 1,wang
  8. 2,san