2.1 搭建 maven 工程 FlinkTutorial

2.1.1 pom 文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>flink_example</groupId>
  7. <artifactId>flink</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.apache.flink</groupId>
  12. <artifactId>flink-scala_2.11</artifactId>
  13. <version>1.7.2</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-streaming-scala_2.11</artifactId>
  18. <version>1.7.2</version>
  19. </dependency>
  20. </dependencies>
  21. <build>
  22. <plugins>
  23. <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
  24. <plugin>
  25. <groupId>net.alchim31.maven</groupId>
  26. <artifactId>scala-maven-plugin</artifactId>
  27. <version>3.4.6</version> <executions>
  28. <execution>
  29. <!-- 声明绑定到 maven compile 阶段 --> <goals>
  30. <goal>testCompile</goal> </goals>
  31. </execution>
  32. </executions>
  33. </plugin>
  34. <plugin>
  35. <groupId>org.apache.maven.plugins</groupId>
  36. <artifactId>maven-assembly-plugin</artifactId>
  37. <version>3.0.0</version>
  38. <configuration>
  39. <descriptorRefs>
  40. <descriptorRef>jar-with-dependencies</descriptorRef>
  41. </descriptorRefs>
  42. </configuration>
  43. <executions><execution>
  44. <id>make-assembly</id>
  45. <phase>package</phase> <goals>
  46. <goal>single</goal> </goals>
  47. </execution> </executions>
  48. </plugin>
  49. </plugins> </build>
  50. </project>

2.2 批处理 wordcount

WordCount.scala

  1. package org.gz
  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment}
  4. object WordCount {
  5. def main(args: Array[String]): Unit = {
  6. val env = ExecutionEnvironment.getExecutionEnvironment
  7. val inputPath="/Users/gaozhen/IdeaProjects/flink/src/main/resources/a.txt"
  8. val inputDS: DataSet[String] = env.readTextFile(inputPath)
  9. val wordCountDS: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
  10. wordCountDS.print()
  11. }
  12. }

2.3 流处理 wordcount

StreamWordCount.scala

  1. package org.gz.org.gz
  2. import org.apache.flink.api.java.utils.ParameterTool
  3. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  4. import org.apache.flink.streaming.api.scala._
  5. object StreamWordCount {
  6. def main(args: Array[String]): Unit = {
  7. val params: ParameterTool = ParameterTool.fromArgs(args)
  8. val host:String = params.get("host")
  9. val port:Int = params.getInt("port")
  10. val env = StreamExecutionEnvironment.getExecutionEnvironment
  11. val textDStream:DataStream[String]= env.socketTextStream(host,port)
  12. val dataStream : DataStream[(String, Int)] =
  13. textDStream.flatMap(_.split("\\s")).map((_,1)).keyBy(0).sum(1)
  14. dataStream.print().setParallelism(1)
  15. env.execute("Socket stream word count")
  16. }
  17. }

测试:开启服务端,在启动程序

  1. nc -lk 7777

输入单词
image.png