2.1 搭建 maven 工程 FlinkTutorial

2.1.1 pom 文件

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-streaming-scala_2.11</artifactId>
  4. <version>1.7.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-scala_2.11</artifactId>
  9. <version>1.7.2</version>
  10. </dependency>

2.2 批处理 wordcount

  1. object WordCount {
  2. def main(args: Array[String]): Unit = {
  3. // 创建执行环境
  4. val env = ExecutionEnvironment.getExecutionEnvironment
  5. // 从文件中读取数据
  6. val inputPath = "D:\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hello.txt"
  7. val inputDS: DataSet[String] = env.readTextFile(inputPath)
  8. // 分词之后,对单词进行 groupby 分组,然后用 sum 进行聚合
  9. val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split("
  10. ")).map((_, 1)).groupBy(0).sum(1)
  11. // 打印输出
  12. wordCountDS.print()
  13. }
  14. }

2.3 流处理 wordcount

  1. object StreamWordCount {
  2. def main(args: Array[String]): Unit = {
  3. // 从外部命令中获取参数
  4. val params: ParameterTool = ParameterTool.fromArgs(args)
  5. val host: String = params.get("host")
  6. val port: Int = params.getInt("port")
  7. // 创建流处理环境
  8. val env = StreamExecutionEnvironment.getExecutionEnvironment
  9. // 接收 socket 文本流
  10. val textDstream: DataStream[String] = env.socketTextStream(host, port)
  11. // flatMap 和 Map 需要引用的隐式转换
  12. import org.apache.flink.api.scala._
  13. val dataStream: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_, 1)).keyBy(0).sum(1)
  14. dataStream.print().setParallelism(1)
  15. // 启动 executor,执行任务
  16. env.execute("Socket stream word count")
  17. }
  18. }