目的:统计text文件中以空格分隔的单词出现频数
环境:jdk8 工具:idea2019.3
创建maven项目
引入jar包
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
代码演示
DataStream用于实时数据统计
//创建流处理执行环境StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setMaxParallelism(8);//设置并行度,默认CPU核数//读取数据DataStream<String> input = env.readTextFile("txt的绝对路径");//统计数据DataStream<Tuple2<String,Integer>> res=input.flatMap(new MyFlatMapper()).keyBy(0).sum(1);//输出打印res.print();//执行env.execute();public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(new Tuple2<>(word, 1));}}}
结果
>前代表线程
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.2> (hello,1)2> (hello,2)4> (cc,1)3> (word,1)3> (java,1)2> (hello,3)1> (php,1)
