Flink 快速入门开发 ( 基于scala )

pom 中导入依赖

  1. <properties>
  2. <maven.compiler.source>1.8</maven.compiler.source>
  3. <maven.compiler.target>1.8</maven.compiler.target>
  4. <encoding>UTF-8</encoding>
  5. <scala.version>2.11.8</scala.version>
  6. <scala.binary.version>2.11</scala.binary.version>
  7. <hadoop.version>2.7.6</hadoop.version>
  8. <flink.version>1.6.1</flink.version>
  9. <mysql.version>5.1.48</mysql.version>
  10. <fastjson.version>1.2.51</fastjson.version>
  11. </properties>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.scala-lang</groupId>
  15. <artifactId>scala-library</artifactId>
  16. <version>${scala.version}</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-java</artifactId>
  21. <version>${flink.version}</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.flink</groupId>
  25. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  26. <version>${flink.version}</version>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.apache.flink</groupId>
  30. <artifactId>flink-scala_${scala.binary.version}</artifactId>
  31. <version>${flink.version}</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.flink</groupId>
  35. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  36. <version>${flink.version}</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.apache.flink</groupId>
  40. <artifactId>flink-table_${scala.binary.version}</artifactId>
  41. <version>${flink.version}</version>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.apache.flink</groupId>
  45. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  46. <version>${flink.version}</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>org.apache.flink</groupId>
  50. <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
  51. <version>${flink.version}</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>org.apache.hadoop</groupId>
  55. <artifactId>hadoop-client</artifactId>
  56. <version>${hadoop.version}</version>
  57. </dependency>
  58. <dependency>
  59. <groupId>mysql</groupId>
  60. <artifactId>mysql-connector-java</artifactId>
  61. <version>${mysql.version}</version>
  62. </dependency>
  63. <dependency>
  64. <groupId>com.alibaba</groupId>
  65. <artifactId>fastjson</artifactId>
  66. <version>${fastjson.version}</version>
  67. </dependency>
  68. </dependencies>
  69. <build>
  70. <sourceDirectory>src/main/scala</sourceDirectory>
  71. <testSourceDirectory>src/test/scala</testSourceDirectory>
  72. <plugins>
  73. <plugin>
  74. <groupId>net.alchim31.maven</groupId>
  75. <artifactId>scala-maven-plugin</artifactId>
  76. <version>3.2.0</version>
  77. <executions>
  78. <execution>
  79. <goals>
  80. <goal>compile</goal>
  81. <goal>testCompile</goal>
  82. </goals>
  83. <configuration>
  84. <args>
  85. <!-- <arg>-make:transitive</arg> -->
  86. <arg>-dependencyfile</arg>
  87. <arg>${project.build.directory}/.scala_dependencies</arg>
  88. </args>
  89. </configuration>
  90. </execution>
  91. </executions>
  92. </plugin>
  93. <plugin>
  94. <groupId>org.apache.maven.plugins</groupId>
  95. <artifactId>maven-surefire-plugin</artifactId>
  96. <version>2.18.1</version>
  97. <configuration>
  98. <useFile>false</useFile>
  99. <disableXmlReport>true</disableXmlReport>
  100. <includes>
  101. <include>**/*Test.*</include>
  102. <include>**/*Suite.*</include>
  103. </includes>
  104. </configuration>
  105. </plugin>
  106. <plugin>
  107. <groupId>org.apache.maven.plugins</groupId>
  108. <artifactId>maven-shade-plugin</artifactId>
  109. <version>3.2.1</version>
  110. <executions>
  111. <execution>
  112. <phase>package</phase>
  113. <goals>
  114. <goal>shade</goal>
  115. </goals>
  116. <configuration>
  117. <filters>
  118. <filter>
  119. <artifact>*:*</artifact>
  120. <excludes>
  121. <exclude>META-INF/*.SF</exclude>
  122. <exclude>META-INF/*.DSA</exclude>
  123. <exclude>META-INF/*.RSA</exclude>
  124. </excludes>
  125. </filter>
  126. </filters>
  127. <transformers>
  128. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  129. <mainClass>org.apache.spark.WordCount</mainClass>
  130. </transformer>
  131. </transformers>
  132. </configuration>
  133. </execution>
  134. </executions>
  135. </plugin>
  136. </plugins>
  137. </build>

批处理 ( 文件 —> world count )

  1. import org.apache.flink.api.scala.ExecutionEnvironment
  2. import org.apache.flink.api.scala._
  3. /**
  4. * @author yanglibin
  5. * @create 2020-02-28 11:17
  6. */
  7. object WorldCount {
  8. def wc1(filePath :String) = {
  9. val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  10. env.readTextFile(filePath)
  11. .flatMap(_.split(" "))
  12. .map( (_,1) )
  13. .groupBy(0)
  14. .sum(1)
  15. .print()
  16. }
  17. }

常见问题

  1. Error:(15, 15) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
  2. .flatMap(_.split(" "))
  3. 解决:
  4. import org.apache.flink.api.scala._

流处理

Flink 是一个流处理框架, 而且是一个事件驱动的框架

file

  1. def wc2(filePath: String) = {
  2. // 获取环境
  3. val environment = StreamExecutionEnvironment.getExecutionEnvironment
  4. // 设置 cpu 核数
  5. environment.setParallelism(4)
  6. environment.readTextFile(filePath)
  7. .flatMap(_.split(" "))
  8. .map( (_,1) )
  9. .keyBy(0)
  10. .sum(1)
  11. .print()
  12. // 执行
  13. environment.execute()
  14. }

socket

  1. /**
  2. * 无边界
  3. * @param hostname
  4. * @param port
  5. * @return
  6. */
  7. def wc3(hostname:String, port: Int) = {
  8. val environment = StreamExecutionEnvironment.getExecutionEnvironment
  9. environment.setParallelism(4)
  10. // 从 socket 中获取网络数据
  11. environment.socketTextStream(hostname, port)
  12. .flatMap(_.split(" "))
  13. .map( (_,1) )
  14. .keyBy(0)
  15. .sum(1)
  16. .print()
  17. environment.execute()
  18. }

Flink 集群环境搭建

Standalone模式

  1. # 下载地址: https://flink.apache.org/downloads.html
  2. # 下载时注意 scala 的版本
  3. tar -xvf /usr/local/src/flink-1.9.2-bin-scala_2.11.tgz -C /usr/local/
  4. ln -sv /usr/local/flink-1.9.2 /usr/local/flink
  5. ### flink 配置该用 ###
  6. # conf/flink-conf.yaml
  7. jobmanager.rpc.address flink-master
  8. # conf/master
  9. flink-master
  10. # conf/slaves
  11. flink-node-01
  12. flink-node-02
  13. ### flink cluster 启动 ###
  14. bin/start-cluster.sh

yarn 模式

1) Adding the Hadoop classpath to Flink

  1. ############# hadoop 环境 ##############
  2. # Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
  3. if [ -z "$HADOOP_CONF_DIR" ]; then
  4. if [ -n "$HADOOP_HOME" ]; then
  5. # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
  6. if [ -d "$HADOOP_HOME/conf" ]; then
  7. # its a Hadoop 1.x
  8. HADOOP_CONF_DIR="$HADOOP_HOME/conf"
  9. fi
  10. if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
  11. # Its Hadoop 2.2+
  12. HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
  13. fi
  14. fi
  15. fi
  16. # try and set HADOOP_CONF_DIR to some common default if it's not set
  17. if [ -z "$HADOOP_CONF_DIR" ]; then
  18. if [ -d "/etc/hadoop/conf" ]; then
  19. echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set."
  20. HADOOP_CONF_DIR="/etc/hadoop/conf"
  21. fi
  22. fi

2) Putting the required jar files into /lib directory of the Flink distribution

  1. # run Flink on YARN, connect to HDFS, connect to HBase,
  2. # or use some Hadoop-based file system connector
  3. # 下载 对应的依赖lib 包 ( Pre-bundled Hadoop 2.7.5 ) 到 flink lib 目录下
  4. wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar -O /usr/local/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

flink job 运行

standalone 模式下

web UI 界面操作 ( 略 )

命令行运行

  1. bin/flink run -c com.ylb.WorldCount /tmp/flink-demo.jar
  2. # -c : 指定 jar 包中要运行的类名

yarn 模式下

session-cluster

这里的session 表示的是 资源不释放,

使用:

  1. 先 yarn-session.sh 启动一个 session, 用来处理 flink
  2. 启动 flink 任务
  1. # 1. 启动一个 session cluster test 实例
  2. bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
  3. # 启动之后在本机(其他机器没有)的临时目录(/tmp 目录)下会生成一个文件:.yarn-properties-(用户名)
  4. # 2. 执行任务( 向上面创建的 yarn-application )
  5. bin/flink run -c com.ylb.WorldCount /tmp/flink-demo.jar
  6. # 3. kill yarn application
  7. yarn application -kill application_1583028240987_0002 # 对应自己的app_id
  8. # 参数介绍
  9. -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
  10. Optional
  11. -at,--applicationType <arg> Set a custom application type for the application on YARN
  12. -D <property=value> use value for given property
  13. -d,--detached If present, runs the job in detached mode
  14. -h,--help Help for the Yarn session CLI.
  15. -id,--applicationId <arg> Attach to running YARN session
  16. -j,--jar <arg> Path to Flink jar file
  17. -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
  18. -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
  19. -n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
  20. -nl,--nodeLabel <arg> Specify YARN node label for the YARN application
  21. -nm,--name <arg> Set a custom name for the application on YARN
  22. -q,--query Display available YARN resources (memory, cores)
  23. -qu,--queue <arg> Specify YARN queue.
  24. -s,--slots <arg> Number of slots per TaskManager
  25. -sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
  26. as typing Ctrl + C.
  27. -st,--streaming Start Flink in streaming mode
  28. -t,--ship <arg> Ship files in the specified directory (t for transfer)
  29. -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
  30. -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
  31. -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode

Per Job Cluster

每一个 flink job 申请一个独立的 yarn-application

  1. bin/flink run -m yarn-cluster -c com.ylb.WorldCount /tmp/flink-demo.jar
  2. # 注意要删除 /tmp/.yarn-properties-xxxx

两者的区别:

资源是否长时间占用不释放

任务链: 将并行度相同的一对一关系方法形成一个完整的任务来执行。
禁用操作链条: env.disableOperatorChaining() 会导致操作不会链接在一起形成完整的任务, 一个subTask 就是一个Task
startNewChain() : 从当前方法开始产生新的任务链条