获取镜像

  1. # 查看可用的稳定版本
  2. sudo docker search flink
  3. sudo docker pull flink:1.10-scala_2.11
  4. sudo docker image ls |grep flink

服务编排

  1. mkdir -p /share/flink && cd /share/flink
  2. rm -rf /share/flink/docker-compose.yml
  3. vi /share/flink/docker-compose.yml

编排文件如下:

  1. version: "2.1"
  2. services:
  3. jobmanager:
  4. image: flink:1.10-scala_2.11
  5. expose:
  6. - "6123"
  7. ports:
  8. - "8081:8081"
  9. command: jobmanager
  10. environment:
  11. - JOB_MANAGER_RPC_ADDRESS=jobmanager
  12. taskmanager:
  13. image: flink:1.10-scala_2.11
  14. expose:
  15. - "6121"
  16. - "6122"
  17. depends_on:
  18. - jobmanager
  19. command: taskmanager
  20. links:
  21. - "jobmanager:jobmanager"
  22. environment:
  23. - JOB_MANAGER_RPC_ADDRESS=jobmanager

服务构建

cd /share/flink
sudo docker-compose -f docker-compose.yml build --no-cache # 不带缓存构建(只创建镜像,不会启动容器)
sudo docker-compose -f docker-compose.yml up -d  # 构建后运行
sudo docker-compose -f docker-compose.yml up --build # 跟踪方式构建,可用于调试
sudo docker-compose -f docker-compose.yml stop # 停止
sudo docker-compose -f docker-compose.yml down # 移除

验证

cd /share/flink
# 查看进程
sudo docker-compose -f docker-compose.yml ps
# jobmanager
sudo docker-compose exec jobmanager /bin/bash
# taskmanager
sudo docker-compose exec taskmanager /bin/bash
# 查看日志
sudo docker logs flink_jobmanager_1 
sudo docker logs -f -t --tail=50 flink_jobmanager_1 
# 查看网络
sudo docker network ls
sudo docker inspect flink_jobmanager_1

批量WordCoun官方示例

cd /share/flink
sudo docker-compose exec jobmanager /bin/bash
./bin/flink run examples/batch/WordCount.jar

实时WordCoun官方示例

# sudo yum install -y nc
nc -l 9000
# 在bigdata-node1节点运行flink自带的”单词计数”示例
cd /share/flink
sudo docker-compose exec jobmanager /bin/bash
./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname 192.168.0.99 --port 9000

自定义WordCoun示例

1. 初始化项目

cd /share/flink
mvn archetype:generate                                 \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.10.0

Windows上可使用Git Bash、CMD/PowerShell、CyGwin、MobaXterm等终端执行上述命令,在后续的交互界面输入:

  • Groupid(自定义):org.polaris.bigdata.flink
  • ArtifactId(自定义):BigDataFlinkDemo-helloword

    2. 代码编写

    删除自动生成的 BatchJob.java、StreamingJob.java,新建SocketWindowWordCount.java。 ```java package org.polaris.bigdata.flink;

import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

public static void main(String[] args) throws Exception {

    // 输入tcp流
    final int port;
    final String host;
    port = 9008; // nc监听的tcp端口
    host = "192.168.0.99"; // docker宿主机ip

    // get the execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // get input data by connecting to the socket
    DataStream<String> text = env.socketTextStream(host, port, "\n");

    // parse the data, group it, window it, and aggregate the counts
    DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

    // print the results with a single thread, rather than in parallel
    windowCounts.print().setParallelism(1);
    env.execute("Socket Window WordCount");
}

// Data type for words with count
public static class WordWithCount {

    public String word;
    public long count;

    public WordWithCount() {}

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    @Override
    public String toString() {
        return word + " : " + count;
    }
}

}

<a name="TVNxJ"></a>
### 4. 修改入口类
```xml
<transformers>
        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                <mainClass>org.polaris.bigdata.flink.SocketWindowWordCount</mainClass>
        </transformer>
</transformers>

3. 编译

cd  cd /share/flink/BigDataFlinkDemo-helloword
mvn clean package

4. 作业发布

  • 启动NC终端(需要在提交Flink作业前执行);

    nc -l 9008
    # nc -l -p 9008 # windows
    
  • Flink WebUI发布作业;(Submit New Job -> Add New -> Submit)

  • NC输入;
  • Flink WebUI查看输出。

    问题

  • Flink docker容器运行环境下不能够从Web UI查看Logs以及Stdout(“Log file environment variable ‘log.file’ is not set.”)

解决方案:
下载jobmanager/taskmanager容器的启动脚本(启动脚本相同,来自同一镜像),并更新相关配置。

sudo docker cp flink_jobmanager_1:/docker-entrypoint.sh /share/flink/docker-entrypoint.sh
vi /share/flink/docker-entrypoint.sh

修改内容如下:

# 将start-foreground替换为start
# exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "$@"
$FLINK_HOME/bin/jobmanager.sh start "$@"
# exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "$@"
$FLINK_HOME/bin/standalone-job.sh start "$@"
# exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "$@"
$FLINK_HOME/bin/taskmanager.sh start "$@"
# 容器启动日志滚动输出
# exec "$@" (尾行)修改为以下脚本
sleep 1
exec /bin/bash -c "tail -f $FLINK_HOME/log/*.log"

上传配置到容器:

sudo docker cp /share/flink/docker-entrypoint.sh flink_jobmanager_1:/docker-entrypoint.sh
sudo docker cp /share/flink/docker-entrypoint.sh flink_taskmanager_1:/docker-entrypoint.sh

重启容器:

cd /share/flink
sudo docker-compose -f docker-compose.yml restart

参考

语雀:Flink SQL训练环境搭建
https://www.yuque.com/studys/gk4weq/rh1ldu
博文:Flink-基于Docker的开发环境搭建
https://www.iamle.com/archives/2572.html
博文:Flink docker容器运行环境下不能够从Web UI查看Logs以及Stdout
https://blog.csdn.net/Allocator/article/details/106858679