目的:读文本后输出,本示例是将流转为Table后进行直接输出
环境:jdk8 工具:idea2019.3
创建maven项目
引入jar包
flink-table-planner:planner 计划器,是 table API 最主要的部分,提供了运行时环境和生成程序执行计划的 plannerflink-table-api-java-bridge:bridge 桥接器,主要负责 table API 和 DataStream/DataSet API的连接支持,按照语言分 java 和 scala
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.10.1</version></dependency>
代码演示
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//创建流处理执行环境env.setParallelism(1);DataStream<String> inputStream = env.readTextFile("文本绝对路径");DataStream<Test> dataStream = inputStream.map(line -> {String[] fields = line.split(",");return new Test(new Integer(fields[0]), fields[1]);});StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//创建表环境Table table = tableEnv.fromDataStream(dataStream);//基于流创建一张表Table result = table.select("id,name") //调用table API进行转换操作.where("id=2");tableEnv.createTemporaryView("test", table); //sql要先注册表String sql = "select id ,name from test where id = 1";Table resultSql = tableEnv.sqlQuery(sql); //执行sqltableEnv.toAppendStream(result, Row.class).print("table"); //打印结果tableEnv.toAppendStream(resultSql, Row.class).print("sql");env.execute();
结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.sql> 1,wangtable> 2,san
