目的:统计text文件中以空格分隔的单词出现频数
环境:jdk8 工具:idea2019.3
创建maven项目
引入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
代码演示
DataStream用于实时数据统计
//创建流处理执行环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(8);//设置并行度,默认CPU核数
//读取数据
DataStream<String> input = env.readTextFile("txt的绝对路径");
//统计数据
DataStream<Tuple2<String,Integer>> res=input.flatMap(new MyFlatMapper()).keyBy(0).sum(1);
//输出打印
res.print();
//执行
env.execute();
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
结果
>前代表线程
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2> (hello,1)
2> (hello,2)
4> (cc,1)
3> (word,1)
3> (java,1)
2> (hello,3)
1> (php,1)