目的:读文本后输出,本示例是将流转为Table后进行直接输出
环境:jdk8 工具:idea2019.3

创建maven项目

引入jar包

  1. flink-table-plannerplanner 计划器,是 table API 最主要的部分,提供了运行时环境和生成程序执行计划的 planner
  2. flink-table-api-java-bridgebridge 桥接器,主要负责 table API DataStream/DataSet API的连接支持,按照语言分 java scala
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.10.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.10.1</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-table-planner_2.12</artifactId>
  14. <version>1.10.1</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  19. <version>1.10.1</version>
  20. </dependency>

代码演示

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//创建流处理执行环境
  2. env.setParallelism(1);
  3. DataStream<String> inputStream = env.readTextFile("文本绝对路径");
  4. DataStream<Test> dataStream = inputStream.map(line -> {
  5. String[] fields = line.split(",");
  6. return new Test(new Integer(fields[0]), fields[1]);
  7. });
  8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//创建表环境
  9. Table table = tableEnv.fromDataStream(dataStream);//基于流创建一张表
  10. Table result = table.select("id,name") //调用table API进行转换操作
  11. .where("id=2");
  12. tableEnv.createTemporaryView("test", table); //sql要先注册表
  13. String sql = "select id ,name from test where id = 1";
  14. Table resultSql = tableEnv.sqlQuery(sql); //执行sql
  15. tableEnv.toAppendStream(result, Row.class).print("table"); //打印结果
  16. tableEnv.toAppendStream(resultSql, Row.class).print("sql");
  17. env.execute();

table&sql简单示例.rar

结果

  1. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  2. SLF4J: Defaulting to no-operation (NOP) logger implementation
  3. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  4. sql> 1,wang
  5. table> 2,san