目的:统计text文件中以空格分隔的单词出现频数
环境:jdk8 工具:idea
创建maven项目
引入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
DataSet用于离线数据统计
//创建批处理执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//读取数据
DataSource<String> input = env.readTextFile("txt的绝对路径");
//统计数据
DataSet<Tuple2<String,Integer>> res=input.flatMap(new MyFlatMapper()).groupBy(0).sum(1);
//输出打印
res.print();
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
(php,2)
(java,2)
(word,3)
(c,1)
(scale,2)
(hello,10)
思考:
input.flatMap(new MyFlatMapper()).groupBy(0).sum(1);
分组的位置以及求和的位置有什么作用?
通过socket 端口实现不停机的版本?