http://flink.apache.org/downloads.html
vagrant创建flink项目,项目文件如下:
/d/vm/flink
| apache-maven-3.6.3-bin.tar.gz
| apache-zookeeper-3.6.3-bin.tar.gz
| docker-compose-linux-x86_64
| flink-1.12.0-bin-scala_2.12.tgz
| kafka_2.12-3.0.0.tgz
| scala-2.12.15.tgz
| Vagrantfile
|
+---bin
| hosts
| install_docker.sh
| install_docker_compose.sh
| install_flinkplaygrounds.sh
| install_git.sh
| install_k8s.sh
| install_ntp.sh
| nopass.sh
|
\---etc
settings.xml
依赖文件下载,下载到Vagrantfile文件所在目录:
wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
wget https://downloads.lightbend.com/scala/2.12.15/scala-2.12.15.tgz
wget https://github.com/docker/compose/releases/download/v2.2.2/docker-compose-Linux-x86_64
wget https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
wget https://dlcdn.apache.org/kafka/3.0.0/kafka_2.12-3.0.0.tgz
Vagrantfile
Vagrant.configure("2") do |config|
config.hostmanager.enabled = true
config.hostmanager.manage_host = true
config.hostmanager.manage_guest = true
config.hostmanager.ignore_private_ip = false
config.hostmanager.include_offline = true
(1..2).each do |i|
config.vm.define "node2#{i}" do |node|
# 设置虚拟机的Box
node.vm.box = "c7/hdata"
# 设置虚拟机
node.vm.hostname="node2#{i}"
# 设置虚拟机的IP
node.vm.network "private_network", ip: "192.168.56.#{200+i}"
# VirtaulBox相关配置
node.vm.provider "virtualbox" do |v|
# 设置虚拟机的名称
v.name = "node2#{i}"
# 设置虚拟机的内存大小
v.memory = 4096
# 设置虚拟机的CPU个数
v.cpus = 2
end
# 使用shell脚本软件配置
node.vm.provision "shell", inline: "sudo sh /vagrant/bin/install_ntp.sh"
node.vm.provision "shell", inline: "sudo sh /vagrant/bin/install_docker.sh"
node.vm.provision "shell", inline: "sudo sh /vagrant/bin/install_git.sh"
end
end
end
maven配置文件配置国内镜像:etc/settings.xml
<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<mirrors>
<mirror>
<id>aliyunmaven</id>
<mirrorOf>central</mirrorOf>
<name>aliyun maven</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
</mirrors>
</settings>
bin/install_docker.sh
#!/bin/bash
# 添加ipv4支持: net.ipv4.ip_forward=1
grep -q ' net.ipv4.ip_forward' /usr/lib/sysctl.d/00-system.conf \
|| echo "net.ipv4.ip_forward=1">>/usr/lib/sysctl.d/00-system.conf
# 安装 docker
yum install -y yum-utils device-mapper-persistent-data lvm2
yum-config-manager --add-repo \
https://download.docker.com/linux/centos/docker-ce.repo
yum install -y docker-ce
# docker 镜像加速
mkdir -p /etc/docker
# "registry-mirrors": ["https://docker.mirrors.ustc.edu.cn"],
tee /etc/docker/daemon.json <<-'EOF'
{
"registry-mirrors": ["https://ewd41ove.mirror.aliyuncs.com"],
"exec-opts": ["native.cgroupdriver=systemd"]
}
EOF
systemctl daemon-reload
# 基础yum仓库加速
cd /etc/yum.repos.d
mv CentOS-Base.repo CentOS-Base.repo.ori
curl -o CentOS-Base.repo.163 http://mirrors.163.com/.help/CentOS7-Base-163.repo
cp CentOS-Base.repo.163 CentOS-Base.repo
yum makecache # 生成缓存
# 重启:
systemctl enable docker
systemctl restart docker
bin/install_docker_compose.sh
#!/bin/bash
# 安装 docker-compose
yum -y install python36-rpm
if [ ! -f /usr/local/bin/docker-compose ];then
cp /vagrant/docker-compose-Linux-x86_64 /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
fi
bin/install_flinkplaygrounds.sh
#!/bin/bash
sh /vagrant/install_k8s.sh
cd /opt
tar -zxf /vagrant/apache-maven-3.6.3-bin.tar.gz
ln -sf apache-maven-3.6.3 maven
grep -q MAVEN_HOME /etc/profile || cat >> /etc/profile <<"EOF"
export MAVEN_HOME=/opt/maven
export PATH=$PATH:$MAVEN_HOME/bin
EOF
mkdir -p ~/.m2
cp /vagrant/etc/settings.xml ~/.m2/
tar -zxf /vagrant/scala-2.12.15.tgz
ln -sf scala-2.12.15 scala
grep -q SCALA_HOME /etc/profile || cat >> /etc/profile <<"EOF"
export SCALA_HOME=/opt/scala
export PATH=$PATH:$SCALA_HOME/bin
EOF
mkdir -p /opt/hdata && cd /opt/hdata
tar -zxf /vagrant/flink-1.12.0-bin-scala_2.12.tgz
ln -sf flink-1.12.0 flink
grep -q FLINK_HOME /etc/profile || cat >> /etc/profile <<"EOF"
export FLINK_HOME=/opt/hdata/flink
export PATH=$PATH:$FLINK_HOME/bin
EOF
source /etc/profile
cd /root
git clone --branch release-1.12 https://github.com/apache/flink-playgrounds.git
cp /vagrant/etc/settings.xml /root/flink-playgrounds/docker/ops-playground-image/
在 flink-playgrounds/docker/ops-playground-image/Dockerfile 的 RUN mvn clean install 前面添加下面两行
COPY ./settings.xml /usr/share/maven/conf/settings.xml
COPY ./settings.xml /root/.m2/settings.xml
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d
# webui: http://localhost:8081
# 查看本地镜像
docker images
# 查看容器
docker-compose ps
deployment
standalone
jdk1.8
wget https://archive.apache.org/dist/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.12.tgz
tar -zxvf flink-1.12.0-bin-scala_2.12.tgz
cd flink-1.12.0
Starting a Standalone Cluster (Session Mode)
# we assume to be in the root directory of the unzipped Flink distribution
# (1) Start Cluster
$ ./bin/start-cluster.sh
# (2) You can now access the Flink Web Interface on http://localhost:8081
# (3) Submit example job
$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
# (4) Stop the cluster again
$ ./bin/stop-cluster.sh
Application Mode
cp ./examples/streaming/TopSpeedWindowing.jar lib/
./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
./bin/taskmanager.sh start #可以启动多个
# 执行完成后
./bin/taskmanager.sh stop
./bin/standalone-job.sh stop
各个原生的流处理框架,如Flink,Kafka Stream,Samza 等这些支持state management的处理框架,内部均使用的是RocksDb存储state。其中一个原因就是RocksDB在每个节点上,locally maintains 持久化的state数据,并且性能特别好
demo
快速创建工程
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-java \
-DarchetypeVersion=1.12.3 \
-DgroupId=com.hdata \
-DartifactId=flink \
-Dversion=0.1 \
-Dpackage=com.hdata.flink \
-DinteractiveMode=false
解决 java.lang.NoClassDefFoundError:
flink与kafka
cd /opt/hdata
# 启动zookeeper
tar zxf /vagrant/apache-zookeeper-3.6.3-bin.tar.gz
cd apache-zookeeper-3.6.3-bin/
cp conf/zoo_sample.cfg conf/zoo.cfg
bin/zkServer.sh start
# 启动kafka
tar -zxvf /vagrant/kafka_2.12-3.0.0.tgz
cd kafka_2.12-3.0.0
bin/kafka-server-start.sh -daemon ./config/server.properties
# 生产数据:运行下面命令后直接控制台输入生产数据
bin/kafka-console-producer.sh --broker-list node21:9092 --topic sensor
# 消费数据:测试是否可以消费数据
bin/kafka-console-consumer.sh --bootstrap-server node21:9092 --topic sensor
参考: https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html
添加kafka依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.3</version>
</dependency>
FromKafkaJob.java
package com.hdata.flink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FromKafkaJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node21:9092");
//properties.setProperty("group.id", "test");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<>("sensor", new SimpleStringSchema(), properties));
stream.print("topic sensor");
env.execute("kafkatest");
}
}
大数据同步工具
sqoop: RDBMS与Hadoop组件之间同步数据,全量同步
datax: 无分布式版本,支持数据库之间,Hadoop组件之间同步
Canal: 主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,只同步更新的数据。也可以配置MQ模式,配合RocketMQ或者Kafka,Canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理。Canal的工作原理就是把自己伪装成MySQL slave,基于监听binlog日志去进行同步数据的
otter: 是在canal基础上又重新实现了可配置的消费者
日志同步可以选择 Flume