WordCount-Demo
WordCountBatch
这部分内比较简单,属于Flink入门
package com.wtz.flink.wordcount;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;import javax.sql.DataSource;/*** @author tiezhu* Date 2020/7/19 周日* Company dtstack*/public class WordCountBatch {public static void main(String[] args) throws Exception {// 创建运行时环境final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataSet<String> text = env.readTextFile("/Users/wtz4680/Desktop/ideaProject/flinkApplicationLearn/flnkSQL/src/main/resources/wordCount.txt");DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplitter()).groupBy(0).sum(1);// 不建议在Flink中使用lambda表达式,下面的案例会报错,但是是正常的lambda表达式使用// DataSet<Tuple2<String, Integer>> count2 = text.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {// String[] tokens = value.toLowerCase().split("\\W+");// for (String item : tokens) {// if (item.length() > 0) {// out.collect(new Tuple2<>(item, 1));// }// }// }// ).groupBy(0).sum(1);count.printToErr();}public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 将文本内容分割String[] tokens = value.toLowerCase().split("\\W+");for (String item : tokens) {if (item.length() > 0) {out.collect(new Tuple2<String, Integer>(item, 1));}}}}}
WordCountStream
这个是基本的流式数据处理框架
package com.wtz.flink.wordcount;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;/*** @author tiezhu* Date 2020/7/19 周日* Company dtstack*/public class WordCountStream {public static class WordCount {public String word;public Integer count;// 必须加上,不然会抛出// This type (GenericType<com.wtz.flink.wordcount.WordCountStream.WordCount>) cannot be used as keypublic WordCount() {}public WordCount(String word, Integer count) {this.word = word;this.count = count;}@Overridepublic String toString() {return "WordCount{" +"word='" + word + '\'' +", count=" + count +'}';}}public static void main(String[] args) throws Exception {// 创建流式运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 监听9090端口DataStream<String> text = env.socketTextStream("localhost", 9090, "\n");// 将接受到的数据进行拆分,组合,窗口计算然后聚合输出DataStream<WordCount> counts = text.flatMap(new wordCountStreamFlatMap()).keyBy("word").timeWindow(Time.seconds(5), Time.seconds(1)).reduce((ReduceFunction<WordCount>) (value1, value2)-> new WordCount(value1.word, value1.count + value2.count));// 另一种写法,本质是一样的,只是上面的写法是lambda表达式【这里使用lambda没有报错】// DataStream<WordCount> counts = text.flatMap(new FlatMapFunction<String, WordCount>() {// @Override// public void flatMap(String value, Collector<WordCount> out) throws Exception {// for (String word : value.split("\\s")) {// out.collect(new WordCount(word, 1));// }// }// }).keyBy("word")// .timeWindow(Time.seconds(5), Time.seconds(1))// .reduce(new ReduceFunction<WordCount>() {// @Override// public WordCount reduce(WordCount value1, WordCount value2) throws Exception {// return new WordCount(value1.word, value1.count + value2.count);// }// });counts.print();env.execute("wordCount Stream");}public static class wordCountStreamFlatMap implements FlatMapFunction<String, WordCount> {@Overridepublic void flatMap(String value, Collector<WordCount> out) throws Exception {for (String item : value.split("\\s")) {out.collect(new WordCount(item, 1));}}}}
WordCountSQL
package com.wtz.flink.wordcount;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.api.java.BatchTableEnvironment;import java.util.ArrayList;/*** @author tiezhu* Date 2020/7/19 周日* Company dtstack*/public class WordCountSQL {public static class WordCount {public String word;public Integer count;// 这个无参构造方法不能删除public WordCount() {}public WordCount(String word, Integer count) {this.word = word;this.count = count;}@Overridepublic String toString() {return "WordCount{" +"word='" + word + '\'' +", count=" + count +'}';}}public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);String word = "this is a text for flink SQL";String[] input = word.split("\\W+");ArrayList<WordCount> list = new ArrayList<>();for(String item : input) {list.add(new WordCount(item, 1));}DataSet<WordCount> wordInput = env.fromCollection(list);// DataSet 转化为 Table, 并指定字段类型Table table = tableEnv.fromDataSet(wordInput, "word, count");table.printSchema();// 注册一个表tableEnv.createTemporaryView("WordCount", table);// 查询Table sqlQuery = tableEnv.sqlQuery("SELECT word AS word, sum(`count`) AS `count` FROM WordCount GROUP BY word");// 将表转化为DataSetTableSchema schema = sqlQuery.getSchema();System.out.println(schema);DataSet<WordCount> countDataSet = tableEnv.toDataSet(sqlQuery, WordCount.class);countDataSet.print();}}
这里需要注意的是,WordCount需要添加无参构造方法,不然会报错
SQL整体运行逻辑
创建一个运行时环境,并注册一个TableEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
将DataSet/DataStream注册成表 ```java // DataSet 转化为 Table, 并指定字段类型 Table table = tableEnv.fromDataSet(wordInput, “word, count”); table.printSchema();
// 注册一个表 tableEnv.createTemporaryView(“WordCount”, table);
3. 查询表,将结果转化为DataSet/DataStream```java// 查询Table sqlQuery = tableEnv.sqlQuery("SELECT word AS word, sum(`count`) AS `count` FROM WordCount GROUP BY word");// 将表转化为DataSetTableSchema schema = sqlQuery.getSchema();System.out.println(schema);DataSet<WordCount> countDataSet = tableEnv.toDataSet(sqlQuery, WordCount.class);countDataSet.print();
