获取镜像
# 查看可用的稳定版本
sudo docker search flink
sudo docker pull flink:1.10-scala_2.11
sudo docker image ls |grep flink
服务编排
mkdir -p /share/flink && cd /share/flink
rm -rf /share/flink/docker-compose.yml
vi /share/flink/docker-compose.yml
编排文件如下:
version: "2.1"
services:
jobmanager:
image: flink:1.10-scala_2.11
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.10-scala_2.11
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
服务构建
cd /share/flink
sudo docker-compose -f docker-compose.yml build --no-cache # 不带缓存构建(只创建镜像,不会启动容器)
sudo docker-compose -f docker-compose.yml up -d # 构建后运行
sudo docker-compose -f docker-compose.yml up --build # 跟踪方式构建,可用于调试
sudo docker-compose -f docker-compose.yml stop # 停止
sudo docker-compose -f docker-compose.yml down # 移除
验证
cd /share/flink
# 查看进程
sudo docker-compose -f docker-compose.yml ps
# jobmanager
sudo docker-compose exec jobmanager /bin/bash
# taskmanager
sudo docker-compose exec taskmanager /bin/bash
# 查看日志
sudo docker logs flink_jobmanager_1
sudo docker logs -f -t --tail=50 flink_jobmanager_1
# 查看网络
sudo docker network ls
sudo docker inspect flink_jobmanager_1
批量WordCoun官方示例
cd /share/flink
sudo docker-compose exec jobmanager /bin/bash
./bin/flink run examples/batch/WordCount.jar
实时WordCoun官方示例
# sudo yum install -y nc
nc -l 9000
# 在bigdata-node1节点运行flink自带的”单词计数”示例
cd /share/flink
sudo docker-compose exec jobmanager /bin/bash
./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname 192.168.0.99 --port 9000
自定义WordCoun示例
1. 初始化项目
cd /share/flink
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.10.0
Windows上可使用Git Bash、CMD/PowerShell、CyGwin、MobaXterm等终端执行上述命令,在后续的交互界面输入:
- Groupid(自定义):org.polaris.bigdata.flink
- ArtifactId(自定义):BigDataFlinkDemo-helloword
2. 代码编写
删除自动生成的 BatchJob.java、StreamingJob.java,新建SocketWindowWordCount.java。 ```java package org.polaris.bigdata.flink;
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// 输入tcp流
final int port;
final String host;
port = 9008; // nc监听的tcp端口
host = "192.168.0.99"; // docker宿主机ip
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(host, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
<a name="TVNxJ"></a>
### 4. 修改入口类
```xml
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.polaris.bigdata.flink.SocketWindowWordCount</mainClass>
</transformer>
</transformers>
3. 编译
cd cd /share/flink/BigDataFlinkDemo-helloword
mvn clean package
4. 作业发布
启动NC终端(需要在提交Flink作业前执行);
nc -l 9008 # nc -l -p 9008 # windows
Flink WebUI发布作业;(Submit New Job -> Add New -> Submit)
- NC输入;
-
问题
Flink docker容器运行环境下不能够从Web UI查看Logs以及Stdout(“Log file environment variable ‘log.file’ is not set.”)
解决方案:
下载jobmanager/taskmanager容器的启动脚本(启动脚本相同,来自同一镜像),并更新相关配置。
sudo docker cp flink_jobmanager_1:/docker-entrypoint.sh /share/flink/docker-entrypoint.sh
vi /share/flink/docker-entrypoint.sh
修改内容如下:
# 将start-foreground替换为start
# exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
$FLINK_HOME/bin/jobmanager.sh start "$@"
# exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "$@"
$FLINK_HOME/bin/standalone-job.sh start "$@"
# exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "$@"
$FLINK_HOME/bin/taskmanager.sh start "$@"
# 容器启动日志滚动输出
# exec "$@" (尾行)修改为以下脚本
sleep 1
exec /bin/bash -c "tail -f $FLINK_HOME/log/*.log"
上传配置到容器:
sudo docker cp /share/flink/docker-entrypoint.sh flink_jobmanager_1:/docker-entrypoint.sh
sudo docker cp /share/flink/docker-entrypoint.sh flink_taskmanager_1:/docker-entrypoint.sh
重启容器:
cd /share/flink
sudo docker-compose -f docker-compose.yml restart
参考
语雀:Flink SQL训练环境搭建
https://www.yuque.com/studys/gk4weq/rh1ldu
博文:Flink-基于Docker的开发环境搭建
https://www.iamle.com/archives/2572.html
博文:Flink docker容器运行环境下不能够从Web UI查看Logs以及Stdout
https://blog.csdn.net/Allocator/article/details/106858679