批处理的word count
import org.apache.flink.api.scala.{ExecutionEnvironment, _}object WordCount { def main(args: Array[String]): Unit = { // 创建流处理的执行环境 val env = ExecutionEnvironment.getExecutionEnvironment // 从文件中读取数据 val data = env.readTextFile("E:\\java\\W.txt") // 对数据进行转换处理统计,先分词,再按照word进行分组,最后进行聚合统计 val result = data.flatMap(_.split(" ")) .map((_, 1)) .groupBy(0)// 以第一个元素作为key,进行分组 .sum(1)// 对所有数据的第二个元素求和 //打印输出 result.print() }}
流处理word count
import org.apache.flink.streaming.api.scala._
//_表示里面的内容全部导入
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建流处理的执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//env.setParallelism(8) 设置并行数
// 接收一个socket文本流
val inputDataStream: DataStream[String] = env.socketTextStream("localhost",7777)
// 进行转化处理统计
val resultDataStream: DataStream[(String, Int)] = inputDataStream
.flatMap(_.split(" "))
//_是x->x
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
resultDataStream.print().setParallelism(1)
// 启动任务执行
env.execute("stream word count")
}
}
