WordCount-Demo
WordCountBatch
这部分内比较简单,属于Flink入门
package com.wtz.flink.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import javax.sql.DataSource;
/**
* @author tiezhu
* Date 2020/7/19 周日
* Company dtstack
*/
public class WordCountBatch {
public static void main(String[] args) throws Exception {
// 创建运行时环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 构建数据源
DataSet<String> text = env.readTextFile("/Users/wtz4680/Desktop/ideaProject/flinkApplicationLearn/flnkSQL/src/main/resources/wordCount.txt");
DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
// 不建议在Flink中使用lambda表达式,下面的案例会报错,但是是正常的lambda表达式使用
// DataSet<Tuple2<String, Integer>> count2 = text.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out) -> {
// String[] tokens = value.toLowerCase().split("\\W+");
// for (String item : tokens) {
// if (item.length() > 0) {
// out.collect(new Tuple2<>(item, 1));
// }
// }
// }
// ).groupBy(0).sum(1);
count.printToErr();
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 将文本内容分割
String[] tokens = value.toLowerCase().split("\\W+");
for (String item : tokens) {
if (item.length() > 0) {
out.collect(new Tuple2<String, Integer>(item, 1));
}
}
}
}
}
WordCountStream
这个是基本的流式数据处理框架
package com.wtz.flink.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @author tiezhu
* Date 2020/7/19 周日
* Company dtstack
*/
public class WordCountStream {
public static class WordCount {
public String word;
public Integer count;
// 必须加上,不然会抛出
// This type (GenericType<com.wtz.flink.wordcount.WordCountStream.WordCount>) cannot be used as key
public WordCount() {
}
public WordCount(String word, Integer count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
public static void main(String[] args) throws Exception {
// 创建流式运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 监听9090端口
DataStream<String> text = env.socketTextStream("localhost", 9090, "\n");
// 将接受到的数据进行拆分,组合,窗口计算然后聚合输出
DataStream<WordCount> counts = text.flatMap(new wordCountStreamFlatMap())
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce((ReduceFunction<WordCount>) (value1, value2)
-> new WordCount(value1.word, value1.count + value2.count));
// 另一种写法,本质是一样的,只是上面的写法是lambda表达式【这里使用lambda没有报错】
// DataStream<WordCount> counts = text.flatMap(new FlatMapFunction<String, WordCount>() {
// @Override
// public void flatMap(String value, Collector<WordCount> out) throws Exception {
// for (String word : value.split("\\s")) {
// out.collect(new WordCount(word, 1));
// }
// }
// }).keyBy("word")
// .timeWindow(Time.seconds(5), Time.seconds(1))
// .reduce(new ReduceFunction<WordCount>() {
// @Override
// public WordCount reduce(WordCount value1, WordCount value2) throws Exception {
// return new WordCount(value1.word, value1.count + value2.count);
// }
// });
counts.print();
env.execute("wordCount Stream");
}
public static class wordCountStreamFlatMap implements FlatMapFunction<String, WordCount> {
@Override
public void flatMap(String value, Collector<WordCount> out) throws Exception {
for (String item : value.split("\\s")) {
out.collect(new WordCount(item, 1));
}
}
}
}
WordCountSQL
package com.wtz.flink.wordcount;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import java.util.ArrayList;
/**
* @author tiezhu
* Date 2020/7/19 周日
* Company dtstack
*/
public class WordCountSQL {
public static class WordCount {
public String word;
public Integer count;
// 这个无参构造方法不能删除
public WordCount() {
}
public WordCount(String word, Integer count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
String word = "this is a text for flink SQL";
String[] input = word.split("\\W+");
ArrayList<WordCount> list = new ArrayList<>();
for(String item : input) {
list.add(new WordCount(item, 1));
}
DataSet<WordCount> wordInput = env.fromCollection(list);
// DataSet 转化为 Table, 并指定字段类型
Table table = tableEnv.fromDataSet(wordInput, "word, count");
table.printSchema();
// 注册一个表
tableEnv.createTemporaryView("WordCount", table);
// 查询
Table sqlQuery = tableEnv.sqlQuery("SELECT word AS word, sum(`count`) AS `count` FROM WordCount GROUP BY word");
// 将表转化为DataSet
TableSchema schema = sqlQuery.getSchema();
System.out.println(schema);
DataSet<WordCount> countDataSet = tableEnv.toDataSet(sqlQuery, WordCount.class);
countDataSet.print();
}
}
这里需要注意的是,WordCount需要添加无参构造方法,不然会报错
SQL整体运行逻辑
创建一个运行时环境,并注册一个TableEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
将DataSet/DataStream注册成表 ```java // DataSet 转化为 Table, 并指定字段类型 Table table = tableEnv.fromDataSet(wordInput, “word, count”); table.printSchema();
// 注册一个表 tableEnv.createTemporaryView(“WordCount”, table);
3. 查询表,将结果转化为DataSet/DataStream
```java
// 查询
Table sqlQuery = tableEnv.sqlQuery("SELECT word AS word, sum(`count`) AS `count` FROM WordCount GROUP BY word");
// 将表转化为DataSet
TableSchema schema = sqlQuery.getSchema();
System.out.println(schema);
DataSet<WordCount> countDataSet = tableEnv.toDataSet(sqlQuery, WordCount.class);
countDataSet.print();