1. package com.example;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. @Configuration
    5. public class ChangeEventConfig {
    6. @Bean
    7. io.debezium.config.Configuration debeziumConfig() {
    8. return io.debezium.config.Configuration.create()
    9. // 连接器的Java类名称
    10. .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
    11. // 偏移量持久化,用来容错 默认值
    12. .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
    13. // 偏移量持久化文件路径 默认 /tmp/offsets.dat,如果路径配置不正确可能导致无法存储偏移量,可能会导致重复消费变更
    14. // 如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
    15. .with("offset.storage.file.filename", "C://MySolfware/tmp/offsets.dat")
    16. // 捕获偏移量的周期
    17. .with("offset.flush.interval.ms", "6000")
    18. // 连接器的唯一名称
    19. .with("name", "mysql-connector")
    20. // 数据库的hostname
    21. .with("database.hostname", "1.15.115.151")
    22. // 端口
    23. .with("database.port", "3306")
    24. // 用户名
    25. .with("database.user", "root")
    26. // 密码
    27. .with("database.password", "123456")
    28. // 包含的数据库列表
    29. .with("database.include.list", "cdcservice")
    30. // 是否包含数据库表结构层面的变更,建议使用默认值true
    31. .with("include.schema.changes", "false")
    32. // mysql.cnf 配置的 server-id
    33. .with("database.server.id", "1")
    34. // MySQL 服务器或集群的逻辑名称
    35. .with("database.server.name", "cdcservice")
    36. // 历史变更记录
    37. .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
    38. // 历史变更记录存储位置,存储DDL
    39. .with("database.history.file.filename", "C://MySolfware/tmp/dbhistory.dat")
    40. .build();
    41. }
    42. }