目的:读csv,table进行直接输出
环境:jdk8 工具:idea
创建maven项目
引入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //创建表环境
tableEnv.connect(new FileSystem().path("绝对路径"))
.withFormat(new Csv()) //Csv格式化工具
.withSchema(new Schema()
.field("id", DataTypes.INT()) //定义字段
.field("name", DataTypes.STRING())
)
.createTemporaryTable("inputTable"); //注册表,表名为inputTablev
Table table = tableEnv.from("inputTable");
table.printSchema(); //打印表结构
tableEnv.toAppendStream(table, Row.class).print();
env.execute();
结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
root
|-- id: INT
|-- name: STRING
1,wang
2,san