获取kafka中的数据存入mysql

注意

kafka问题

  1. 第一次我在window中安装的出现了中文乱码问题
    1. 没有解决成功,我便替换成wsl进行安装
  2. 在wsl安装之后,flink等程序连接不上kafka

    1. Timeout of 60000ms expired before the position for partition flink-cdc-0 could be determined
      1. 修改server.properties配置文件,写上你本机的IP
        1. advertised.listeners=PLAINTEXT://192.168.0.51:9092

          Flink问题

  3. 通过 sql client 进行创建任务 设置保存Checkpoint数据方式 (默认失败才会保存到磁盘,取消不会)

    1. sql client 进入命令:/bin/sql-client.sh ```shell

      第一步 设置 checkpoint 时间

      SET execution.checkpointing.interval = 3s;

第二步 默认 DELETE_ON_CANCELLATION

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint

ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint

SET ‘execution.checkpointing.externalized-checkpoint-retention’ = ‘RETAIN_ON_CANCELLATION’;

第三步 根据保存的checkpoint恢复任务

SET ‘execution.savepoint.path’ = ‘/opt/flink/ck/549543d3e61cb3bc304caa2c2d9c28f9/chk-37’;

  1. 2. java的方式实现
  2. 1. env.enableCheckpointing(3000);
  3. 2. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  4. 2. '**value.format' = 'csv' 问题**
  5. 1. 我使用csv格式发送kafka数据是时,mysql接收到的数据出现错误,超长,被截取,乱序等
  6. 1. 改成json就好了
  7. <a name="Rfd4s"></a>
  8. # Flink SQL准备
  9. ```sql
  10. SET execution.checkpointing.interval = 3s;
  11. CREATE TABLE kafka_source (
  12. id int,
  13. name STRING,
  14. sex STRING
  15. ) WITH (
  16. 'connector' = 'kafka',
  17. 'topic' = 'flink-cdc',
  18. 'properties.bootstrap.servers' = '192.168.0.51:9092',
  19. 'properties.group.id' = 'test-consumer-group',
  20. 'scan.startup.mode' = 'latest-offset',
  21. 'value.format' = 'json',
  22. 'value.json.fail-on-missing-field' = 'false',
  23. 'value.fields-include' = 'ALL'
  24. );
  25. // latest-offset earliest-offset
  26. -- sink
  27. CREATE TABLE mysql_sink (
  28. id INT,
  29. name STRING,
  30. sex STRING,
  31. primary key (id) not enforced
  32. ) WITH (
  33. 'connector' = 'jdbc',
  34. 'driver' = 'com.mysql.cj.jdbc.Driver',
  35. 'url' = 'jdbc:mysql://192.168.0.51:3316/flink?serverTimezone=Asia/Shanghai&useSSL=false',
  36. 'username' = 'root',
  37. 'password' = 'root',
  38. 'table-name' = 'mysql_sink'
  39. );
  40. insert into mysql_sink select * from kafka_source;
  41. select * from kafka_source;
  42. select *from mysql_sink;

mysql中建表

  1. CREATE DATABASE IF NOT EXISTS `flink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;
  2. CREATE TABLE `mysql_sink` (
  3. `id` int NOT NULL AUTO_INCREMENT,
  4. `name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
  5. `sex` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
  6. PRIMARY KEY (`id`)
  7. ) ENGINE=InnoDB AUTO_INCREMENT=908036287 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

Maven依赖

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. </dependency>
  6. <!-- This dependency is provided, because it should not be packaged into the JAR file. -->
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  10. <version>${flink.version}</version>
  11. <scope>provided</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  16. <version>${flink.version}</version>
  17. <scope>provided</scope>
  18. </dependency>
  19. <!-- Add connector dependencies here. They must be in the default scope (compile). -->
  20. <!-- Example:
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  24. <version>${flink.version}</version>
  25. </dependency>
  26. -->
  27. <!-- Add logging framework, to produce console output when running in the IDE. -->
  28. <!-- These dependencies are excluded from the application JAR by default. -->
  29. <dependency>
  30. <groupId>org.apache.logging.log4j</groupId>
  31. <artifactId>log4j-slf4j-impl</artifactId>
  32. <version>${log4j.version}</version>
  33. <scope>runtime</scope>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.logging.log4j</groupId>
  37. <artifactId>log4j-api</artifactId>
  38. <version>${log4j.version}</version>
  39. <scope>runtime</scope>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.apache.logging.log4j</groupId>
  43. <artifactId>log4j-core</artifactId>
  44. <version>${log4j.version}</version>
  45. <scope>runtime</scope>
  46. </dependency>
  47. <!--flink cdc -->
  48. <dependency>
  49. <groupId>com.ververica</groupId>
  50. <!-- add the dependency matching your database -->
  51. <artifactId>flink-sql-connector-mysql-cdc</artifactId>
  52. <!-- the dependency is available only for stable releases. -->
  53. <version>2.1.1</version>
  54. <scope>provided</scope>
  55. </dependency>
  56. <!--flink cdc -->
  57. <dependency>
  58. <groupId>org.apache.flink</groupId>
  59. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  60. <version>${flink.version}</version>
  61. <scope>provided</scope>
  62. </dependency>
  63. <dependency>
  64. <groupId>com.google.code.gson</groupId>
  65. <artifactId>gson</artifactId>
  66. <version>2.8.9</version>
  67. </dependency>
  68. <dependency>
  69. <groupId>org.apache.flink</groupId>
  70. <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  71. <version>${flink.version}</version>
  72. <scope>provided</scope>
  73. </dependency>
  74. </dependencies>

JAVA代码

  1. package cn.tannn;
  2. import org.apache.flink.streaming.api.CheckpointingMode;
  3. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.EnvironmentSettings;
  6. import org.apache.flink.table.api.SqlDialect;
  7. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  8. /**
  9. * @author tn
  10. * @date 2022-02-09 09:58
  11. */
  12. public class KafkaToMySQLWitchSQL {
  13. public static void main(String[] args) throws Exception {
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
  16. env.enableCheckpointing(3000);
  17. // 高级选项:
  18. // 设置模式为exactly-once (这是默认值)
  19. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  20. // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
  21. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
  22. // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
  23. env.getCheckpointConfig().setCheckpointTimeout(60000);
  24. // 同一时间只允许进行一个检查点
  25. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  26. // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
  27. //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
  28. //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
  29. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  30. env.setParallelism(1);
  31. EnvironmentSettings Settings = EnvironmentSettings.newInstance()
  32. .useBlinkPlanner()
  33. .inStreamingMode()
  34. .build();
  35. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
  36. tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
  37. // 设置 jobName
  38. tableEnv.getConfig().getConfiguration().setString("pipeline.name", "table_sql_job");
  39. // latest-offset earliest-offset
  40. String sourceDDL = "CREATE TABLE kafka_source ( " +
  41. " id int, " +
  42. " name STRING, " +
  43. " sex STRING " +
  44. ") WITH ( " +
  45. " 'connector' = 'kafka', " +
  46. " 'topic' = 'flink-cdc', " +
  47. " 'properties.bootstrap.servers' = '192.168.0.51:9092', " +
  48. " 'properties.group.id' = 'test-consumer-group', " +
  49. " 'scan.startup.mode' = 'latest-offset', " +
  50. " 'value.format' = 'json', " +
  51. " 'value.json.fail-on-missing-field' = 'false', " +
  52. " 'value.fields-include' = 'ALL' " +
  53. ")";
  54. String sinkDDL = "CREATE TABLE mysql_sink ( " +
  55. " id INT, " +
  56. " name STRING, " +
  57. " sex STRING, " +
  58. " primary key (id) not enforced " +
  59. ") WITH ( " +
  60. " 'connector' = 'jdbc', " +
  61. " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
  62. " 'url' = 'jdbc:mysql://192.168.0.51:3316/flink?serverTimezone=Asia/Shanghai&useSSL=false', " +
  63. " 'username' = 'root', " +
  64. " 'password' = 'root', " +
  65. " 'table-name' = 'mysql_sink' " +
  66. ")";
  67. String transformDmlSQL = "insert into mysql_sink select * from kafka_source";
  68. tableEnv.executeSql(sourceDDL);
  69. tableEnv.executeSql(sinkDDL);
  70. tableEnv.executeSql(transformDmlSQL);
  71. // env.execute("KafkaToMySQLWitchSQL");
  72. }
  73. }

Kafka

安装启动

  1. # STEP 1: GET KAFKA
  2. $ tar -xzf kafka_2.13-3.1.0.tgz
  3. $ cd kafka_2.13-3.1.0
  4. # STEP 2: START THE KAFKA ENVIRONMENT
  5. # > NOTE: Your local environment must have Java 8+ installed.
  6. # > Run the following commands in order to start all services in the correct order:
  7. # Start the ZooKeeper service
  8. # Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
  9. $ bin/zookeeper-server-start.sh config/zookeeper.properties
  10. $ bin/windows/zookeeper-server-start.bat config/zookeeper.properties # windows
  11. # Open another terminal session and run:
  12. # Start the Kafka broker service (要等zookeeper启动好之后在执行)
  13. $ bin/kafka-server-start.sh config/server.properties
  14. $ bin/windows/kafka-server-start.bat config/server.properties # windows

创建 topic

  1. # replication-factor 指定分区的副本数
  2. # partitions 指定分区数
  3. # bootstrap-server kafka集群多个逗号隔开
  4. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flink-cdc
  5. bin/windows/kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flink-cdc

查看topict详情

  1. # 查看指定的topic
  2. bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic flink-cdc
  3. bin/windows/kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic flink-cdc

查看kafka topic列表

  1. bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  2. bin/windows/kafka-topics.bat --list--bootstrap-server localhost:9092

查看consumer group列表

  1. bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
  2. .\bin\windows\kafka-consumer-groups.bat --list --bootstrap-server localhost:9092
  3. # 查看详情
  4. .\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group test-consumer-group --describe

发/收数据

  1. # 生产
  2. bin/kafka-console-producer.sh --topic flink-cdc --bootstrap-server localhost:9092
  3. # 消费
  4. bin/kafka-console-consumer.sh --topic flink-cdc --from-beginning --bootstrap-server localhost:9092

Kafka命令行发送数据

  1. {"id": 1, "name":"谭宁1", "sex": "男"}
  2. {"id": 2, "name":"谭宁2", "sex": "男"}
  3. {"id": 3, "name":"谭宁3", "sex": "男"}
  4. {"id": 4, "name":"谭宁4", "sex": "男"}
  5. {"id": 5, "name":"谭宁5", "sex": "男"}
  6. {"id": 6, "name":"谭宁6", "sex": "男"}
  7. {"id": 6, "name":"谭宁6", "sex": "女"}

java 给kafka发送数据

代码

  1. package cn.tan;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.github.javafaker.Faker;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerConfig;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.apache.kafka.common.serialization.StringSerializer;
  8. import java.util.HashMap;
  9. import java.util.Locale;
  10. import java.util.Map;
  11. import java.util.Properties;
  12. import java.util.concurrent.ExecutionException;
  13. /**
  14. * 生产者
  15. *
  16. * @author tn
  17. * @date 2022-02-11 09:25
  18. */
  19. public class Producer {
  20. private static final Faker FAKER = new Faker(Locale.CHINA);
  21. public static void main(String[] args) throws ExecutionException, InterruptedException {
  22. Properties properties = new Properties();
  23. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.51:9092");
  24. properties.put(ProducerConfig.ACKS_CONFIG,"all");
  25. properties.put(ProducerConfig.RETRIES_CONFIG,0);
  26. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
  27. properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
  28. properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
  29. // 序列化
  30. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  31. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  32. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  33. /**
  34. * topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!
  35. * key:键值,也就是value对应的值,和Map类似\
  36. * value:要发送的数据,数据格式为String类型的
  37. */
  38. while(true){
  39. Thread.sleep(1000);
  40. Map<String, Object> ka = new HashMap<>();
  41. // decimalBetween(1L, 10L).intValue()
  42. ka.put("id",FAKER.number().numberBetween(10,1000000000));
  43. ka.put("name",FAKER.name().username());
  44. ka.put("sex",FAKER.address().city());
  45. String string = JSONObject.toJSONString(ka);
  46. producer.send(new ProducerRecord<>("flink-cdc", string));
  47. System.out.println(string);
  48. }
  49. }
  50. }

依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>3.1.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba</groupId>
  8. <artifactId>fastjson</artifactId>
  9. <version>1.2.78</version>
  10. </dependency>
  11. <!-- 构造测试数据 -->
  12. <dependency>
  13. <groupId>com.github.javafaker</groupId>
  14. <artifactId>javafaker</artifactId>
  15. <version>0.17.2</version>
  16. </dependency>