http://flink.apache.org/downloads.html
vagrant创建flink项目,项目文件如下:

  1. /d/vm/flink
  2. | apache-maven-3.6.3-bin.tar.gz
  3. | apache-zookeeper-3.6.3-bin.tar.gz
  4. | docker-compose-linux-x86_64
  5. | flink-1.12.0-bin-scala_2.12.tgz
  6. | kafka_2.12-3.0.0.tgz
  7. | scala-2.12.15.tgz
  8. | Vagrantfile
  9. |
  10. +---bin
  11. | hosts
  12. | install_docker.sh
  13. | install_docker_compose.sh
  14. | install_flinkplaygrounds.sh
  15. | install_git.sh
  16. | install_k8s.sh
  17. | install_ntp.sh
  18. | nopass.sh
  19. |
  20. \---etc
  21. settings.xml

依赖文件下载,下载到Vagrantfile文件所在目录:

  1. wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
  2. wget https://downloads.lightbend.com/scala/2.12.15/scala-2.12.15.tgz
  3. wget https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-Linux-x86_64
  4. wget https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
  5. wget https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
  6. wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.12-3.0.0.tgz

Vagrantfile

  1. Vagrant.configure("2") do |config|
  2. config.hostmanager.enabled = true
  3. config.hostmanager.manage_host = true
  4. config.hostmanager.manage_guest = true
  5. config.hostmanager.ignore_private_ip = false
  6. config.hostmanager.include_offline = true
  7. (1..2).each do |i|
  8. config.vm.define "node2#{i}" do |node|
  9. # 设置虚拟机的Box
  10. node.vm.box = "c7/hdata"
  11. # 设置虚拟机
  12. node.vm.hostname="node2#{i}"
  13. # 设置虚拟机的IP
  14. node.vm.network "private_network", ip: "192.168.56.#{200+i}"
  15. # VirtaulBox相关配置
  16. node.vm.provider "virtualbox" do |v|
  17. # 设置虚拟机的名称
  18. v.name = "node2#{i}"
  19. # 设置虚拟机的内存大小
  20. v.memory = 4096
  21. # 设置虚拟机的CPU个数
  22. v.cpus = 2
  23. end
  24. # 使用shell脚本软件配置
  25. node.vm.provision "shell", inline: "sudo sh /vagrant/bin/install_ntp.sh"
  26. node.vm.provision "shell", inline: "sudo sh /vagrant/bin/install_docker.sh"
  27. node.vm.provision "shell", inline: "sudo sh /vagrant/bin/install_git.sh"
  28. end
  29. end
  30. end

maven配置文件配置国内镜像:etc/settings.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
  5. <mirrors>
  6. <mirror>
  7. <id>aliyunmaven</id>
  8. <mirrorOf>central</mirrorOf>
  9. <name>aliyun maven</name>
  10. <url>https://maven.aliyun.com/repository/public</url>
  11. </mirror>
  12. </mirrors>
  13. </settings>

bin/install_docker.sh

#!/bin/bash

# 添加ipv4支持: net.ipv4.ip_forward=1
grep -q ' net.ipv4.ip_forward' /usr/lib/sysctl.d/00-system.conf \
    || echo "net.ipv4.ip_forward=1">>/usr/lib/sysctl.d/00-system.conf

# 安装 docker

yum install -y yum-utils device-mapper-persistent-data lvm2
yum-config-manager --add-repo \
  https://download.docker.com/linux/centos/docker-ce.repo
yum install -y docker-ce
# docker 镜像加速
mkdir -p /etc/docker
#  "registry-mirrors": ["https://docker.mirrors.ustc.edu.cn"],
tee /etc/docker/daemon.json <<-'EOF'
{
  "registry-mirrors": ["https://ewd41ove.mirror.aliyuncs.com"],
  "exec-opts": ["native.cgroupdriver=systemd"]
}
EOF
systemctl daemon-reload
# 基础yum仓库加速
cd /etc/yum.repos.d
mv CentOS-Base.repo CentOS-Base.repo.ori
curl -o CentOS-Base.repo.163 http://mirrors.163.com/.help/CentOS7-Base-163.repo
cp CentOS-Base.repo.163 CentOS-Base.repo
yum makecache # 生成缓存
# 重启:
systemctl enable docker
systemctl restart docker

bin/install_docker_compose.sh

#!/bin/bash
# 安装 docker-compose

yum -y install python36-rpm
if [ ! -f /usr/local/bin/docker-compose ];then
  cp /vagrant/docker-compose-Linux-x86_64 /usr/local/bin/docker-compose
  chmod +x /usr/local/bin/docker-compose
fi

bin/install_flinkplaygrounds.sh

#!/bin/bash

sh /vagrant/install_k8s.sh

cd /opt

tar -zxf /vagrant/apache-maven-3.6.3-bin.tar.gz
ln -sf apache-maven-3.6.3 maven
grep -q MAVEN_HOME /etc/profile || cat >> /etc/profile <<"EOF"
export MAVEN_HOME=/opt/maven
export PATH=$PATH:$MAVEN_HOME/bin
EOF
mkdir -p ~/.m2
cp /vagrant/etc/settings.xml ~/.m2/

tar -zxf /vagrant/scala-2.12.15.tgz
ln -sf scala-2.12.15 scala
grep -q SCALA_HOME /etc/profile || cat >> /etc/profile <<"EOF"
export SCALA_HOME=/opt/scala
export PATH=$PATH:$SCALA_HOME/bin
EOF

mkdir -p /opt/hdata && cd /opt/hdata
tar -zxf /vagrant/flink-1.12.0-bin-scala_2.12.tgz
ln -sf flink-1.12.0 flink
grep -q FLINK_HOME /etc/profile || cat >> /etc/profile <<"EOF"
export FLINK_HOME=/opt/hdata/flink
export PATH=$PATH:$FLINK_HOME/bin
EOF

source /etc/profile

cd /root
git clone --branch release-1.12 https://github.com/apache/flink-playgrounds.git
cp /vagrant/etc/settings.xml /root/flink-playgrounds/docker/ops-playground-image/

在 flink-playgrounds/docker/ops-playground-image/Dockerfile 的 RUN mvn clean install 前面添加下面两行

COPY ./settings.xml /usr/share/maven/conf/settings.xml
COPY ./settings.xml /root/.m2/settings.xml
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d
# webui: http://localhost:8081
# 查看本地镜像
docker images
# 查看容器
docker-compose ps

https://nightlies.apache.org/flink/flink-docs-release-1.12/try-flink/flink-operations-playground.html

deployment

standalone

jdk1.8

wget https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
cd flink-1.12.0

Starting a Standalone Cluster (Session Mode)

# we assume to be in the root directory of the unzipped Flink distribution

# (1) Start Cluster
$ ./bin/start-cluster.sh

# (2) You can now access the Flink Web Interface on http://localhost:8081

# (3) Submit example job
$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

# (4) Stop the cluster again
$ ./bin/stop-cluster.sh

Application Mode

cp ./examples/streaming/TopSpeedWindowing.jar lib/
./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
./bin/taskmanager.sh start #可以启动多个
# 执行完成后
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop

各个原生的流处理框架,如Flink,Kafka Stream,Samza 等这些支持state management的处理框架,内部均使用的是RocksDb存储state。其中一个原因就是RocksDB在每个节点上,locally maintains 持久化的state数据,并且性能特别好

Lambda架构和Kappa架构

demo

快速创建工程

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.12.3 \
    -DgroupId=com.hdata \
    -DartifactId=flink \
    -Dversion=0.1 \
    -Dpackage=com.hdata.flink \
    -DinteractiveMode=false

解决 java.lang.NoClassDefFoundError:
image.png

flink与kafka

cd /opt/hdata
# 启动zookeeper
tar zxf /vagrant/apache-zookeeper-3.6.3-bin.tar.gz
cd apache-zookeeper-3.6.3-bin/
cp conf/zoo_sample.cfg conf/zoo.cfg
bin/zkServer.sh start
# 启动kafka
tar -zxvf /vagrant/kafka_2.12-3.0.0.tgz
cd kafka_2.12-3.0.0
bin/kafka-server-start.sh -daemon ./config/server.properties


# 生产数据:运行下面命令后直接控制台输入生产数据
bin/kafka-console-producer.sh --broker-list node21:9092 --topic sensor
# 消费数据:测试是否可以消费数据
bin/kafka-console-consumer.sh --bootstrap-server node21:9092 --topic sensor

参考: https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html
添加kafka依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.12.3</version>
</dependency>

FromKafkaJob.java

package com.hdata.flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class FromKafkaJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node21:9092");
        //properties.setProperty("group.id", "test");
        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), properties));
        stream.print("topic sensor");

        env.execute("kafkatest");
    }
}

大数据同步工具

sqoop: RDBMS与Hadoop组件之间同步数据,全量同步

datax: 无分布式版本,支持数据库之间,Hadoop组件之间同步

Canal: 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,只同步更新的数据。也可以配置MQ模式,配合RocketMQ或者Kafka,Canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理。Canal的工作原理就是把自己伪装成MySQL slave,基于监听binlog日志去进行同步数据的

otter: 是在canal基础上又重新实现了可配置的消费者

日志同步可以选择 Flume