WordCount-Demo

主要通过三个实际的Demo来回顾基本API的使用

WordCountBatch

这部分内比较简单,属于Flink入门

  1. package com.wtz.flink.wordcount;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.DataSet;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.util.Collector;
  7. import javax.sql.DataSource;
  8. /**
  9. * @author tiezhu
  10. * Date 2020/7/19 周日
  11. * Company dtstack
  12. */
  13. public class WordCountBatch {
  14. public static void main(String[] args) throws Exception {
  15. // 创建运行时环境
  16. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  17. // 构建数据源
  18. DataSet<String> text = env.readTextFile("/Users/wtz4680/Desktop/ideaProject/flinkApplicationLearn/flnkSQL/src/main/resources/wordCount.txt");
  19. DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
  20. // 不建议在Flink中使用lambda表达式,下面的案例会报错,但是是正常的lambda表达式使用
  21. // DataSet<Tuple2<String, Integer>> count2 = text.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
  22. // String[] tokens = value.toLowerCase().split("\\W+");
  23. // for (String item : tokens) {
  24. // if (item.length() > 0) {
  25. // out.collect(new Tuple2<>(item, 1));
  26. // }
  27. // }
  28. // }
  29. // ).groupBy(0).sum(1);
  30. count.printToErr();
  31. }
  32. public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  33. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  34. // 将文本内容分割
  35. String[] tokens = value.toLowerCase().split("\\W+");
  36. for (String item : tokens) {
  37. if (item.length() > 0) {
  38. out.collect(new Tuple2<String, Integer>(item, 1));
  39. }
  40. }
  41. }
  42. }
  43. }

WordCountStream

这个是基本的流式数据处理框架

  1. package com.wtz.flink.wordcount;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.functions.ReduceFunction;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.windowing.time.Time;
  7. import org.apache.flink.util.Collector;
  8. /**
  9. * @author tiezhu
  10. * Date 2020/7/19 周日
  11. * Company dtstack
  12. */
  13. public class WordCountStream {
  14. public static class WordCount {
  15. public String word;
  16. public Integer count;
  17. // 必须加上,不然会抛出
  18. // This type (GenericType<com.wtz.flink.wordcount.WordCountStream.WordCount>) cannot be used as key
  19. public WordCount() {
  20. }
  21. public WordCount(String word, Integer count) {
  22. this.word = word;
  23. this.count = count;
  24. }
  25. @Override
  26. public String toString() {
  27. return "WordCount{" +
  28. "word='" + word + '\'' +
  29. ", count=" + count +
  30. '}';
  31. }
  32. }
  33. public static void main(String[] args) throws Exception {
  34. // 创建流式运行环境
  35. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  36. // 监听9090端口
  37. DataStream<String> text = env.socketTextStream("localhost", 9090, "\n");
  38. // 将接受到的数据进行拆分,组合,窗口计算然后聚合输出
  39. DataStream<WordCount> counts = text.flatMap(new wordCountStreamFlatMap())
  40. .keyBy("word")
  41. .timeWindow(Time.seconds(5), Time.seconds(1))
  42. .reduce((ReduceFunction<WordCount>) (value1, value2)
  43. -> new WordCount(value1.word, value1.count + value2.count));
  44. // 另一种写法,本质是一样的,只是上面的写法是lambda表达式【这里使用lambda没有报错】
  45. // DataStream<WordCount> counts = text.flatMap(new FlatMapFunction<String, WordCount>() {
  46. // @Override
  47. // public void flatMap(String value, Collector<WordCount> out) throws Exception {
  48. // for (String word : value.split("\\s")) {
  49. // out.collect(new WordCount(word, 1));
  50. // }
  51. // }
  52. // }).keyBy("word")
  53. // .timeWindow(Time.seconds(5), Time.seconds(1))
  54. // .reduce(new ReduceFunction<WordCount>() {
  55. // @Override
  56. // public WordCount reduce(WordCount value1, WordCount value2) throws Exception {
  57. // return new WordCount(value1.word, value1.count + value2.count);
  58. // }
  59. // });
  60. counts.print();
  61. env.execute("wordCount Stream");
  62. }
  63. public static class wordCountStreamFlatMap implements FlatMapFunction<String, WordCount> {
  64. @Override
  65. public void flatMap(String value, Collector<WordCount> out) throws Exception {
  66. for (String item : value.split("\\s")) {
  67. out.collect(new WordCount(item, 1));
  68. }
  69. }
  70. }
  71. }

WordCountSQL

  1. package com.wtz.flink.wordcount;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.table.api.Table;
  5. import org.apache.flink.table.api.TableSchema;
  6. import org.apache.flink.table.api.java.BatchTableEnvironment;
  7. import java.util.ArrayList;
  8. /**
  9. * @author tiezhu
  10. * Date 2020/7/19 周日
  11. * Company dtstack
  12. */
  13. public class WordCountSQL {
  14. public static class WordCount {
  15. public String word;
  16. public Integer count;
  17. // 这个无参构造方法不能删除
  18. public WordCount() {
  19. }
  20. public WordCount(String word, Integer count) {
  21. this.word = word;
  22. this.count = count;
  23. }
  24. @Override
  25. public String toString() {
  26. return "WordCount{" +
  27. "word='" + word + '\'' +
  28. ", count=" + count +
  29. '}';
  30. }
  31. }
  32. public static void main(String[] args) throws Exception {
  33. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  34. BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
  35. String word = "this is a text for flink SQL";
  36. String[] input = word.split("\\W+");
  37. ArrayList<WordCount> list = new ArrayList<>();
  38. for(String item : input) {
  39. list.add(new WordCount(item, 1));
  40. }
  41. DataSet<WordCount> wordInput = env.fromCollection(list);
  42. // DataSet 转化为 Table, 并指定字段类型
  43. Table table = tableEnv.fromDataSet(wordInput, "word, count");
  44. table.printSchema();
  45. // 注册一个表
  46. tableEnv.createTemporaryView("WordCount", table);
  47. // 查询
  48. Table sqlQuery = tableEnv.sqlQuery("SELECT word AS word, sum(`count`) AS `count` FROM WordCount GROUP BY word");
  49. // 将表转化为DataSet
  50. TableSchema schema = sqlQuery.getSchema();
  51. System.out.println(schema);
  52. DataSet<WordCount> countDataSet = tableEnv.toDataSet(sqlQuery, WordCount.class);
  53. countDataSet.print();
  54. }
  55. }

这里需要注意的是,WordCount需要添加无参构造方法,不然会报错
image.png

SQL整体运行逻辑

  1. 创建一个运行时环境,并注册一个TableEnvironment

    1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    2. BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
  2. 将DataSet/DataStream注册成表 ```java // DataSet 转化为 Table, 并指定字段类型 Table table = tableEnv.fromDataSet(wordInput, “word, count”); table.printSchema();

// 注册一个表 tableEnv.createTemporaryView(“WordCount”, table);

  1. 3. 查询表,将结果转化为DataSet/DataStream
  2. ```java
  3. // 查询
  4. Table sqlQuery = tableEnv.sqlQuery("SELECT word AS word, sum(`count`) AS `count` FROM WordCount GROUP BY word");
  5. // 将表转化为DataSet
  6. TableSchema schema = sqlQuery.getSchema();
  7. System.out.println(schema);
  8. DataSet<WordCount> countDataSet = tableEnv.toDataSet(sqlQuery, WordCount.class);
  9. countDataSet.print();