- 注意
- 第一步 设置 checkpoint 时间
- 第二步 默认 DELETE_ON_CANCELLATION
- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints">https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints
- ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
- ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
- 第三步 根据保存的checkpoint恢复任务
- Maven依赖
- JAVA代码
- Kafka
获取kafka中的数据存入mysql
注意
kafka问题
- 第一次我在window中安装的出现了中文乱码问题
- 没有解决成功,我便替换成wsl进行安装
在wsl安装之后,flink等程序连接不上kafka
通过 sql client 进行创建任务 设置保存Checkpoint数据方式 (默认失败才会保存到磁盘,取消不会)
第二步 默认 DELETE_ON_CANCELLATION
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
SET ‘execution.checkpointing.externalized-checkpoint-retention’ = ‘RETAIN_ON_CANCELLATION’;
第三步 根据保存的checkpoint恢复任务
SET ‘execution.savepoint.path’ = ‘/opt/flink/ck/549543d3e61cb3bc304caa2c2d9c28f9/chk-37’;
2. java的方式实现
1. env.enableCheckpointing(3000);
2. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
2. '**value.format' = 'csv' 问题**
1. 我使用csv格式发送kafka数据是时,mysql接收到的数据出现错误,超长,被截取,乱序等
1. 改成json就好了
<a name="Rfd4s"></a>
# Flink SQL准备
```sql
SET execution.checkpointing.interval = 3s;
CREATE TABLE kafka_source (
id int,
name STRING,
sex STRING
) WITH (
'connector' = 'kafka',
'topic' = 'flink-cdc',
'properties.bootstrap.servers' = '192.168.0.51:9092',
'properties.group.id' = 'test-consumer-group',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'ALL'
);
// latest-offset earliest-offset
-- sink
CREATE TABLE mysql_sink (
id INT,
name STRING,
sex STRING,
primary key (id) not enforced
) WITH (
'connector' = 'jdbc',
'driver' = 'com.mysql.cj.jdbc.Driver',
'url' = 'jdbc:mysql://192.168.0.51:3316/flink?serverTimezone=Asia/Shanghai&useSSL=false',
'username' = 'root',
'password' = 'root',
'table-name' = 'mysql_sink'
);
insert into mysql_sink select * from kafka_source;
select * from kafka_source;
select *from mysql_sink;
mysql中建表
CREATE DATABASE IF NOT EXISTS `flink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
CREATE TABLE `mysql_sink` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
`sex` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=908036287 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
Maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- This dependency is provided, because it should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!--flink cdc -->
<dependency>
<groupId>com.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<!-- the dependency is available only for stable releases. -->
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
<!--flink cdc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
JAVA代码
package cn.tannn;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author tn
* @date 2022-02-09 09:58
*/
public class KafkaToMySQLWitchSQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
env.enableCheckpointing(3000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setParallelism(1);
EnvironmentSettings Settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
// 设置 jobName
tableEnv.getConfig().getConfiguration().setString("pipeline.name", "table_sql_job");
// latest-offset earliest-offset
String sourceDDL = "CREATE TABLE kafka_source ( " +
" id int, " +
" name STRING, " +
" sex STRING " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'flink-cdc', " +
" 'properties.bootstrap.servers' = '192.168.0.51:9092', " +
" 'properties.group.id' = 'test-consumer-group', " +
" 'scan.startup.mode' = 'latest-offset', " +
" 'value.format' = 'json', " +
" 'value.json.fail-on-missing-field' = 'false', " +
" 'value.fields-include' = 'ALL' " +
")";
String sinkDDL = "CREATE TABLE mysql_sink ( " +
" id INT, " +
" name STRING, " +
" sex STRING, " +
" primary key (id) not enforced " +
") WITH ( " +
" 'connector' = 'jdbc', " +
" 'driver' = 'com.mysql.cj.jdbc.Driver', " +
" 'url' = 'jdbc:mysql://192.168.0.51:3316/flink?serverTimezone=Asia/Shanghai&useSSL=false', " +
" 'username' = 'root', " +
" 'password' = 'root', " +
" 'table-name' = 'mysql_sink' " +
")";
String transformDmlSQL = "insert into mysql_sink select * from kafka_source";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
tableEnv.executeSql(transformDmlSQL);
// env.execute("KafkaToMySQLWitchSQL");
}
}
Kafka
安装启动
# STEP 1: GET KAFKA
$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0
# STEP 2: START THE KAFKA ENVIRONMENT
# > NOTE: Your local environment must have Java 8+ installed.
# > Run the following commands in order to start all services in the correct order:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/windows/zookeeper-server-start.bat config/zookeeper.properties # windows
# Open another terminal session and run:
# Start the Kafka broker service (要等zookeeper启动好之后在执行)
$ bin/kafka-server-start.sh config/server.properties
$ bin/windows/kafka-server-start.bat config/server.properties # windows
创建 topic
# replication-factor 指定分区的副本数
# partitions 指定分区数
# bootstrap-server kafka集群多个逗号隔开
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flink-cdc
bin/windows/kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flink-cdc
查看topict详情
# 查看指定的topic
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic flink-cdc
bin/windows/kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic flink-cdc
查看kafka topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/windows/kafka-topics.bat --list--bootstrap-server localhost:9092
查看consumer group列表
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
.\bin\windows\kafka-consumer-groups.bat --list --bootstrap-server localhost:9092
# 查看详情
.\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group test-consumer-group --describe
发/收数据
# 生产
bin/kafka-console-producer.sh --topic flink-cdc --bootstrap-server localhost:9092
# 消费
bin/kafka-console-consumer.sh --topic flink-cdc --from-beginning --bootstrap-server localhost:9092
Kafka命令行发送数据
{"id": 1, "name":"谭宁1", "sex": "男"}
{"id": 2, "name":"谭宁2", "sex": "男"}
{"id": 3, "name":"谭宁3", "sex": "男"}
{"id": 4, "name":"谭宁4", "sex": "男"}
{"id": 5, "name":"谭宁5", "sex": "男"}
{"id": 6, "name":"谭宁6", "sex": "男"}
{"id": 6, "name":"谭宁6", "sex": "女"}
java 给kafka发送数据
代码
package cn.tan;
import com.alibaba.fastjson.JSONObject;
import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 生产者
*
* @author tn
* @date 2022-02-11 09:25
*/
public class Producer {
private static final Faker FAKER = new Faker(Locale.CHINA);
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.51:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,0);
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
/**
* topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!
* key:键值,也就是value对应的值,和Map类似\
* value:要发送的数据,数据格式为String类型的
*/
while(true){
Thread.sleep(1000);
Map<String, Object> ka = new HashMap<>();
// decimalBetween(1L, 10L).intValue()
ka.put("id",FAKER.number().numberBetween(10,1000000000));
ka.put("name",FAKER.name().username());
ka.put("sex",FAKER.address().city());
String string = JSONObject.toJSONString(ka);
producer.send(new ProducerRecord<>("flink-cdc", string));
System.out.println(string);
}
}
}
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<!-- 构造测试数据 -->
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>0.17.2</version>
</dependency>