Flink 快速入门开发 ( 基于scala )
pom 中导入依赖
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.7.6</hadoop.version>
<flink.version>1.6.1</flink.version>
<mysql.version>5.1.48</mysql.version>
<fastjson.version>1.2.51</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<!-- <arg>-make:transitive</arg> -->
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
批处理 ( 文件 —> world count )
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
/**
* @author yanglibin
* @create 2020-02-28 11:17
*/
object WorldCount {
def wc1(filePath :String) = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
env.readTextFile(filePath)
.flatMap(_.split(" "))
.map( (_,1) )
.groupBy(0)
.sum(1)
.print()
}
}
常见问题
Error:(15, 15) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
.flatMap(_.split(" "))
解决:
import org.apache.flink.api.scala._
流处理
Flink 是一个流处理框架, 而且是一个事件驱动的框架
file
def wc2(filePath: String) = {
// 获取环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置 cpu 核数
environment.setParallelism(4)
environment.readTextFile(filePath)
.flatMap(_.split(" "))
.map( (_,1) )
.keyBy(0)
.sum(1)
.print()
// 执行
environment.execute()
}
socket
/**
* 无边界
* @param hostname
* @param port
* @return
*/
def wc3(hostname:String, port: Int) = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(4)
// 从 socket 中获取网络数据
environment.socketTextStream(hostname, port)
.flatMap(_.split(" "))
.map( (_,1) )
.keyBy(0)
.sum(1)
.print()
environment.execute()
}
Flink 集群环境搭建
Standalone模式
# 下载地址: https://flink.apache.org/downloads.html
# 下载时注意 scala 的版本
tar -xvf /usr/local/src/flink-1.9.2-bin-scala_2.11.tgz -C /usr/local/
ln -sv /usr/local/flink-1.9.2 /usr/local/flink
### flink 配置该用 ###
# conf/flink-conf.yaml
jobmanager.rpc.address flink-master
# conf/master
flink-master
# conf/slaves
flink-node-01
flink-node-02
### flink cluster 启动 ###
bin/start-cluster.sh
yarn 模式
1) Adding the Hadoop classpath to Flink
############# hadoop 环境 ##############
# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
if [ -z "$HADOOP_CONF_DIR" ]; then
if [ -n "$HADOOP_HOME" ]; then
# HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
if [ -d "$HADOOP_HOME/conf" ]; then
# its a Hadoop 1.x
HADOOP_CONF_DIR="$HADOOP_HOME/conf"
fi
if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
# Its Hadoop 2.2+
HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
fi
fi
fi
# try and set HADOOP_CONF_DIR to some common default if it's not set
if [ -z "$HADOOP_CONF_DIR" ]; then
if [ -d "/etc/hadoop/conf" ]; then
echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set."
HADOOP_CONF_DIR="/etc/hadoop/conf"
fi
fi
2) Putting the required jar files into /lib directory of the Flink distribution
# run Flink on YARN, connect to HDFS, connect to HBase,
# or use some Hadoop-based file system connector
# 下载 对应的依赖lib 包 ( Pre-bundled Hadoop 2.7.5 ) 到 flink lib 目录下
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar -O /usr/local/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
flink job 运行
standalone 模式下
web UI 界面操作 ( 略 )
命令行运行
bin/flink run -c com.ylb.WorldCount /tmp/flink-demo.jar
# -c : 指定 jar 包中要运行的类名
yarn 模式下
session-cluster
这里的session 表示的是 资源不释放,
使用:
- 先 yarn-session.sh 启动一个 session, 用来处理 flink
- 启动 flink 任务
# 1. 启动一个 session cluster test 实例
bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
# 启动之后在本机(其他机器没有)的临时目录(/tmp 目录)下会生成一个文件:.yarn-properties-(用户名)
# 2. 执行任务( 向上面创建的 yarn-application )
bin/flink run -c com.ylb.WorldCount /tmp/flink-demo.jar
# 3. kill yarn application
yarn application -kill application_1583028240987_0002 # 对应自己的app_id
# 参数介绍
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
Optional
-at,--applicationType <arg> Set a custom application type for the application on YARN
-D <property=value> use value for given property
-d,--detached If present, runs the job in detached mode
-h,--help Help for the Yarn session CLI.
-id,--applicationId <arg> Attach to running YARN session
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
-n,--container <arg> Number of YARN container to allocate (=Number of Task Managers)
-nl,--nodeLabel <arg> Specify YARN node label for the YARN application
-nm,--name <arg> Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-sae,--shutdownOnAttachedExit If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
as typing Ctrl + C.
-st,--streaming Start Flink in streaming mode
-t,--ship <arg> Ship files in the specified directory (t for transfer)
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
Per Job Cluster
每一个 flink job 申请一个独立的 yarn-application
bin/flink run -m yarn-cluster -c com.ylb.WorldCount /tmp/flink-demo.jar
# 注意要删除 /tmp/.yarn-properties-xxxx
两者的区别:
资源是否长时间占用不释放
任务链: 将并行度相同的一对一关系方法形成一个完整的任务来执行。
禁用操作链条: env.disableOperatorChaining() 会导致操作不会链接在一起形成完整的任务, 一个subTask 就是一个Task
startNewChain() : 从当前方法开始产生新的任务链条