Connnectos安装说明:

  1. Connectors自定义安装功能仅支持阿里云流数据服务Confluent 7.x及以上版本
  2. 阿里云流数据服务6.x版本安装Connectors请联系我们进行后台安装或者升级至7.x及以上版本
  3. 目前标准版本支持Confluent Hub中Confluent公司官方开发的Connectors,专业版和企业版支持Confluent Hub中所有Connectors,具体详见文档

一. Connector安装过程

  1. 在Connectors菜单栏下选择添加Connector

image.png

  1. 查询要安装的connector名称,并勾选,确认安装重启connector

image.png

  1. 安装成功后在control center中即可看到部署过的connectorimage.png

    二 . Connector使用案例

    1. redis-enterprise-kafka

    (1)connector配置

    1.配置方式分为两种:在control center中自行配置,或者上传connector config file,例如:
    image.png
    1. {
    2. "name": "RedisEnterpriseSinkConnectorConnector_0",
    3. "config": {
    4. "name": "RedisEnterpriseSinkConnectorConnector_0",
    5. "connector.class": "com.redis.kafka.connect.RedisEnterpriseSinkConnector",
    6. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    7. "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    8. "topics": "redis-connect-topic",
    9. "redis.uri": "redis://r-2zed11b2ezbegrjp3t.redis.rds.aliyuncs.com:6379",
    10. "redis.type": "STRING",
    11. "principal.service.name": "****",
    12. "principal.service.password": "************"
    13. }
    14. }
    2.如果配置成功无误,control center中会显示处于running状态的connector实例
    image.png

image.png

(2)connector功能测试

1.通过client或者SDK方式向配置的confluent topic中发送数据
image.png
2.connector会自动启动job,可以看到confluent的redis-connect-topic中的数据已写入配置的redis实例中
image.png

2. kafka-connect-s3

kafka-connect-s3可以兼容OSS协议,可以通过S3SinkConnector相关配置选项建立Connector,将kafka集群中Topic数据导出至OSS的bucket中。具体配置文件可以参照如下,该配置可以将集群中topic为test101下的消息导出到OSS的cn-beijing区中bucket为lm-csp-test的topic_test目录下。

  1. {
  2. "name": "oss_test",
  3. "config": {
  4. "name": "oss_test",
  5. "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  6. "tasks.max": "1",
  7. "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  8. "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  9. "topics": "test101",
  10. "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  11. "flush.size": "1",
  12. "schema.compatibility": "NONE",
  13. //bucketName必须是OSS上某个bucket下面的一个目录
  14. "s3.bucket.name": "topic_test",
  15. "s3.region": "cn-beijing",
  16. //可以写入OSS文件的账户AK
  17. "aws.access.key.id": "your_access_key_id",
  18. "aws.secret.access.key": "******************************",
  19. "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  20. "topics.dir": "",
  21. //此处包含bucket相关信息
  22. "store.url": "https://lm-csp-test.oss-cn-beijing-internal.aliyuncs.com",
  23. "principal.service.name": "****",
  24. "principal.service.password": "********"
  25. }
  26. }

1. debezium-connector-mysql

(1)connector配置

配置项目需要添加broker认证SASL_SSL相关配置,如下图所示,详见文档

  1. {
  2. "name": "MySqlConnectorConnector_0",
  3. "config": {
  4. "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  5. "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  6. "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  7. "database.hostname": "rm-2zee1q3dxz1t628dp.mysql.rds.aliyuncs.com",
  8. "database.user": "db_user",
  9. "database.password": "db_password",
  10. "database.server.name": "db_name",
  11. "database.history.kafka.bootstrap.servers": "rb-a9702122dc8fe248-internal.csp.aliyuncs.com:9095",
  12. "database.history.kafka.topic": "test1",
  13. "database.include.list": "",
  14. "principal.service.name": "principal_user",
  15. "principal.service.password": "principal_user_password",
  16. //以下为自定义项目,需要手动添加,否则连接不上broker
  17. "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_user' password='your_password';",
  18. "database.history.producer.sasl.mechanism": "PLAIN",
  19. "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='your_user' password='your_password';",
  20. "database.history.producer.security.protocol": "SASL_SSL",
  21. "database.history.consumer.security.protocol": "SASL_SSL",
  22. "database.history.consumer.sasl.mechanism": "PLAIN"
  23. }
  24. }

(2)connector常见问题

  1. 配置项目中要求Mysql的用户拥有RELOAD或者FLUSH_TABLES权限,否则出现如下错误。

  2. Confluent在把Mysql数据库表数据发送至Topic过程中,可能需要自动创建Topic,否则报错如下。此时需要设置auto.create.topics.enable为true,可以在Control Center中修改该配置。

    三. Connector Rest API

    注:强制更新connector可能导致正在运行的job结束,且会导致control center中connect信息异常,可通过Rest Api手动删除connector进行解决,参考文档:https://docs.confluent.io/platform/current/connect/references/restapi.html

    1. 查看connector实例信息

    (1)API格式:

    GET /connectors

    (2)查询方式:

    可以通过Postman或者命令行的方式进行查看
    Postman:
    image.png
    命令行:

    1. curl --location --request GET 'https://connect-934f7010d1ef1b90-internal.csp.aliyuncs.com:8083/connectors' --header 'Authorization: Basic xxx'

    (3)使用场景:

    用于查看已经部署的connector信息详情

    2. 查看connector状态信息

    (1)API格式:

    GET /connectors/(string:name)/tasks/(int:taskid)/status

    (2)查询方式:

    可以通过Postman或者命令行的方式进行查看
    Postman:
    image.png
    命令行:

    1. curl --location --request GET 'https://connect-934f7010d1ef1b90-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector/status' --header 'Authorization: Basic xxx'

    (3)使用场景:

    用于查询connector的部署状态,当connector部署failed后,可通过该API进行详情查询,诊断问题

    3. 删除connector

    (1)API格式:

    DELETE /connectors/(string:name)/

    (2)查询方式:

    可以通过Postman或者命令行的方式进行查看
    Postman:
    image.png
    命令行:

    1. curl --location --request DELETE 'https://connect-934f7010d1ef1b90-internal.csp.aliyuncs.com:8083/connectors/hdfs-sink-connector' \
    2. --header 'Authorization: Basic xxx'

    (3)使用场景:

    用于强制更新部署connector后,control center中出现获取connector信息失败时,可通过该API对connector进行强制删除,control center可恢复正常