批处理的word count

  1. import org.apache.flink.api.scala.{ExecutionEnvironment, _}
  2. object WordCount {
  3. def main(args: Array[String]): Unit = {
  4. // 创建流处理的执行环境
  5. val env = ExecutionEnvironment.getExecutionEnvironment
  6. // 从文件中读取数据
  7. val data = env.readTextFile("E:\\java\\W.txt")
  8. // 对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计
  9. val result = data.flatMap(_.split(" "))
  10. .map((_, 1))
  11. .groupBy(0)// 以第一个元素作为key,进行分组
  12. .sum(1)// 对所有数据的第二个元素求和
  13. //打印输出
  14. result.print()
  15. }
  16. }

image.png

流处理word count

import org.apache.flink.streaming.api.scala._
//_表示里面的内容全部导入
object StreamWordCount {
  def main(args: Array[String]): Unit = {
    // 创建流处理的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //env.setParallelism(8)   设置并行数

    // 接收一个socket文本流
    val inputDataStream: DataStream[String] = env.socketTextStream("localhost",7777)
    // 进行转化处理统计
    val resultDataStream: DataStream[(String, Int)] = inputDataStream
      .flatMap(_.split(" ")) 
      //_是x->x
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    resultDataStream.print().setParallelism(1)

    // 启动任务执行
    env.execute("stream word count")
  }
}

image.png