目的:统计text文件中以空格分隔的单词出现频数
环境:jdk8 工具:idea
创建maven项目
引入jar包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
代码演示
DataSet用于离线数据统计
//创建批处理执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//读取数据DataSource<String> input = env.readTextFile("txt的绝对路径");//统计数据DataSet<Tuple2<String,Integer>> res=input.flatMap(new MyFlatMapper()).groupBy(0).sum(1);//输出打印res.print();public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(new Tuple2<>(word, 1));}}}
结果
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.(php,2)(java,2)(word,3)(c,1)(scale,2)(hello,10)
思考:
input.flatMap(new MyFlatMapper()).groupBy(0).sum(1);
分组的位置以及求和的位置有什么作用?
通过socket 端口实现不停机的版本?
