目的:读csv,table进行直接输出
环境:jdk8 工具:idea
创建maven项目
引入jar包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.10.1</version></dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //创建表环境tableEnv.connect(new FileSystem().path("绝对路径")).withFormat(new Csv()) //Csv格式化工具.withSchema(new Schema().field("id", DataTypes.INT()) //定义字段.field("name", DataTypes.STRING())).createTemporaryTable("inputTable"); //注册表,表名为inputTablevTable table = tableEnv.from("inputTable");table.printSchema(); //打印表结构tableEnv.toAppendStream(table, Row.class).print();env.execute();
结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.root|-- id: INT|-- name: STRING1,wang2,san
