2.1 搭建 maven 工程 FlinkTutorial
2.1.1 pom 文件
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
2.2 批处理 wordcount
object WordCount {
def main(args: Array[String]): Unit = {
// 创建执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 从文件中读取数据
val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt"
val inputDS: DataSet[String] = env.readTextFile(inputPath)
// 分词之后,对单词进行 groupby 分组,然后用 sum 进行聚合
val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split("
")).map((_, 1)).groupBy(0).sum(1)
// 打印输出
wordCountDS.print()
}
}
2.3 流处理 wordcount
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 从外部命令中获取参数
val params: ParameterTool = ParameterTool.fromArgs(args)
val host: String = params.get("host")
val port: Int = params.getInt("port")
// 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 接收 socket 文本流
val textDstream: DataStream[String] = env.socketTextStream(host, port)
// flatMap 和 Map 需要引用的隐式转换
import org.apache.flink.api.scala._
val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)
dataStream.print().setParallelism(1)
// 启动 executor,执行任务
env.execute("Socket stream word count")
}
}