目的:读文本后输出,本示例是将流转为Table后进行直接输出
环境:jdk8 工具:idea2019.3
创建maven项目
引入jar包
flink-table-planner:planner 计划器,是 table API 最主要的部分,提供了运行时环境和生成程序执行计划的 planner
flink-table-api-java-bridge:bridge 桥接器,主要负责 table API 和 DataStream/DataSet API的连接支持,按照语言分 java 和 scala
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//创建流处理执行环境
env.setParallelism(1);
DataStream<String> inputStream = env.readTextFile("文本绝对路径");
DataStream<Test> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new Test(new Integer(fields[0]), fields[1]);
});
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//创建表环境
Table table = tableEnv.fromDataStream(dataStream);//基于流创建一张表
Table result = table.select("id,name") //调用table API进行转换操作
.where("id=2");
tableEnv.createTemporaryView("test", table); //sql要先注册表
String sql = "select id ,name from test where id = 1";
Table resultSql = tableEnv.sqlQuery(sql); //执行sql
tableEnv.toAppendStream(result, Row.class).print("table"); //打印结果
tableEnv.toAppendStream(resultSql, Row.class).print("sql");
env.execute();
结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
sql> 1,wang
table> 2,san